Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions engine/packages/api-public/src/runner_configs/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ async fn upsert_inner(
.iter()
.map(|dc| (dc.clone(), body.datacenters.remove(&dc.name)))
.collect::<Vec<_>>();

// Check for leftover datacenters in the body, this means those datacenters don't exist
if !body.datacenters.is_empty() {
return Err(crate::errors::Datacenter::NotFound.build());
}

let any_endpoint_config_changed = futures_util::stream::iter(dcs)
.map(|(dc, runner_config)| {
let ctx = ctx.clone();
Expand Down
2 changes: 2 additions & 0 deletions engine/packages/pegboard/src/workflows/runner2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ pub async fn pegboard_runner2(ctx: &mut WorkflowCtx, input: &Input) -> Result<()
})
.await?;

// NOTE: This is only if there's a queue of actors. this path is not used if there is enough capacity of
// runners, the actor wf allocates itself independently
// Check for pending actors
let res = ctx
.activity(AllocatePendingActorsInput {
Expand Down
106 changes: 46 additions & 60 deletions engine/packages/pegboard/src/workflows/serverless/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ struct RescheduleState {
#[workflow]
pub async fn pegboard_serverless_connection(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> {
// Run the connection activity, which will handle the full lifecycle
let send_drain_started = ctx
let drain_sent = ctx
.loope(RescheduleState::default(), |ctx, state| {
let input = input.clone();

Expand All @@ -55,8 +55,8 @@ pub async fn pegboard_serverless_connection(ctx: &mut WorkflowCtx, input: &Input
})
.await?;

if let OutboundReqOutput::Done(res) = res {
return Ok(Loop::Break(res.send_drain_started));
if let OutboundReqOutput::Draining { drain_sent } = res {
return Ok(Loop::Break(drain_sent));
}

let mut backoff = reconnect_backoff(
Expand Down Expand Up @@ -87,7 +87,7 @@ pub async fn pegboard_serverless_connection(ctx: &mut WorkflowCtx, input: &Input
tracing::debug!("drain received during serverless connection backoff");

// Notify pool that drain started
return Ok(Loop::Break(true));
return Ok(Loop::Break(false));
}

Ok(Loop::Continue)
Expand All @@ -98,7 +98,7 @@ pub async fn pegboard_serverless_connection(ctx: &mut WorkflowCtx, input: &Input

// If we failed to send inline during the activity, durably ensure the
// signal is dispatched here
if send_drain_started {
if !drain_sent {
ctx.signal(pool::RunnerDrainStarted {
runner_wf_id: input.runner_wf_id,
})
Expand Down Expand Up @@ -141,44 +141,50 @@ struct OutboundReqInput {
runner_name: String,
}

#[derive(Debug, Serialize, Deserialize)]
struct OutboundReqInnerOutput {
send_drain_started: bool,
}

#[derive(Debug, Serialize, Deserialize)]
enum OutboundReqOutput {
Done(OutboundReqInnerOutput),
NeedsRetry,
Continue,
Draining {
/// Whether or not to retry sending the drain signal because it failed or was never sent.
drain_sent: bool,
},
Retry,
}

#[activity(OutboundReq)]
#[timeout = u64::MAX]
async fn outbound_req(ctx: &ActivityCtx, input: &OutboundReqInput) -> Result<OutboundReqOutput> {
match outbound_req_inner(ctx, input).await {
Ok(res) => Ok(OutboundReqOutput::Done(res)),
Err(error) => {
tracing::error!(?error, "outbound_req_inner failed, retrying after backoff");
Ok(OutboundReqOutput::NeedsRetry)
let mut term_signal = TermSignal::new().await;
let mut drain_sub = ctx
.subscribe::<Drain>(("workflow_id", ctx.workflow_id()))
.await?;

loop {
match outbound_req_inner(ctx, input, &mut term_signal, &mut drain_sub).await {
// If the outbound req exited successfully, continue with no backoff
Ok(OutboundReqOutput::Continue) => {}
Ok(OutboundReqOutput::Draining { drain_sent }) => {
return Ok(OutboundReqOutput::Draining { drain_sent });
}
Ok(OutboundReqOutput::Retry) => return Ok(OutboundReqOutput::Retry),
Err(error) => {
tracing::warn!(?error, "outbound_req_inner failed, retrying after backoff");
return Ok(OutboundReqOutput::Retry);
}
}
}
}

async fn outbound_req_inner(
ctx: &ActivityCtx,
input: &OutboundReqInput,
) -> Result<OutboundReqInnerOutput> {
term_signal: &mut TermSignal,
drain_sub: &mut message::SubscriptionHandle<Drain>,
) -> Result<OutboundReqOutput> {
if is_runner_draining(ctx, input.runner_wf_id).await? {
return Ok(OutboundReqInnerOutput {
send_drain_started: true,
});
return Ok(OutboundReqOutput::Draining { drain_sent: false });
}

let mut term_signal = TermSignal::new().await;
let mut drain_sub = ctx
.subscribe::<Drain>(("workflow_id", ctx.workflow_id()))
.await?;

let (runner_config_res, namespace_res) = tokio::try_join!(
ctx.op(crate::ops::runner_config::get::Input {
runners: vec![(input.namespace_id, input.runner_name.clone())],
Expand All @@ -190,9 +196,7 @@ async fn outbound_req_inner(
)?;
let Some(runner_config) = runner_config_res.into_iter().next() else {
tracing::debug!("runner config does not exist, ending outbound req");
return Ok(OutboundReqInnerOutput {
send_drain_started: true,
});
return Ok(OutboundReqOutput::Draining { drain_sent: false });
};

let RunnerConfigKind::Serverless {
Expand All @@ -204,9 +208,7 @@ async fn outbound_req_inner(
} = runner_config.config.kind
else {
tracing::debug!("runner config is not serverless, ending outbound req");
return Ok(OutboundReqInnerOutput {
send_drain_started: true,
});
return Ok(OutboundReqOutput::Draining { drain_sent: false });
};

let namespace = namespace_res
Expand Down Expand Up @@ -320,26 +322,20 @@ async fn outbound_req_inner(
Duration::from_secs(request_lifespan as u64).saturating_sub(DRAIN_GRACE_PERIOD);
tokio::select! {
res = stream_handler => {
return match res {
Err(e) => Err(e.into()),
// TODO:
// For unexpected closes, we don’t know if the runner connected
// or not bc we can’t correlate the runner id.
//
// Lifecycle state falls apart
Ok(_) => Ok(OutboundReqInnerOutput {
send_drain_started: false
})
};
match res {
// If the outbound req was stopped from the client side, we can just continue the loop
Ok(_) => return Ok(OutboundReqOutput::Continue),
Err(e) => return Err(e.into()),
}
},
_ = tokio::time::sleep(sleep_until_drain) => {}
_ = drain_sub.next() => {}
_ = term_signal.recv() => {}
};

tracing::debug!(?runner_id, "connection reached lifespan, needs draining");
tracing::debug!(?runner_id, "connection reached lifespan, starting drain");

if let Err(e) = ctx
if let Err(err) = ctx
.signal(pool::RunnerDrainStarted {
runner_wf_id: input.runner_wf_id,
})
Expand All @@ -349,17 +345,9 @@ async fn outbound_req_inner(
.send()
.await
{
tracing::warn!(
runner_name=%input.runner_name.clone(),
namespace_id=%input.namespace_id,
workflow_id=%ctx.workflow_id(),
"failed to send signal: {}", e
);

// If we failed to send, have the workflow send it durably
return Ok(OutboundReqInnerOutput {
send_drain_started: true,
});
tracing::debug!(?err, "failed to send drain signal");

return Ok(OutboundReqOutput::Draining { drain_sent: false });
}

// After we tell the pool we're draining, any remaining failures
Expand All @@ -371,9 +359,7 @@ async fn outbound_req_inner(
tracing::debug!(?err, "failed non critical draining phase");
}

Ok(OutboundReqInnerOutput {
send_drain_started: false,
})
Ok(OutboundReqOutput::Draining { drain_sent: true })
}

/// Reads from the adjacent serverless runner wf which is keeping track of signals while this workflow runs
Expand All @@ -394,7 +380,7 @@ async fn is_runner_draining(ctx: &ActivityCtx, runner_wf_id: Id) -> Result<bool>
#[tracing::instrument(skip_all)]
async fn finish_non_critical_draining(
ctx: &ActivityCtx,
mut term_signal: TermSignal,
term_signal: &mut TermSignal,
mut source: sse::EventSource,
mut runner_id: Option<Id>,
mut runner_protocol_version: Option<u16>,
Expand Down
27 changes: 21 additions & 6 deletions engine/sdks/typescript/runner/src/mod.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import * as protocol from "@rivetkit/engine-runner-protocol";

Check failure on line 1 in engine/sdks/typescript/runner/src/mod.ts

View workflow job for this annotation

GitHub Actions / quality

format

Formatter would have printed the following content:
import type { Logger } from "pino";
import type WebSocket from "ws";
import { type ActorConfig, RunnerActor } from "./actor";
Expand Down Expand Up @@ -302,6 +302,25 @@
this.#sendActorStateUpdate(actorId, actor.generation, "stopped");
}

#handleLost() {
this.log?.info({
msg: "stopping all actors due to runner lost threshold",
});

// Remove all remaining kv requests
for (const [_, request] of this.#kvRequests.entries()) {
request.reject(
new Error(
"KV request timed out waiting for WebSocket connection",
),
);
}

this.#kvRequests.clear();

this.#stopAllActors();
}

#stopAllActors() {
const actorIds = Array.from(this.#actors.keys());
for (const actorId of actorIds) {
Expand Down Expand Up @@ -837,11 +856,7 @@
seconds: this.#runnerLostThreshold / 1000,
});
this.#runnerLostTimeout = setTimeout(() => {
this.log?.info({
msg: "stopping all actors due to runner lost threshold",
});

this.#stopAllActors();
this.#handleLost();
}, this.#runnerLostThreshold);
}

Expand Down Expand Up @@ -897,7 +912,7 @@
seconds: this.#runnerLostThreshold / 1000,
});
this.#runnerLostTimeout = setTimeout(() => {
this.#stopAllActors();
this.#handleLost();
}, this.#runnerLostThreshold);
}

Expand Down Expand Up @@ -930,10 +945,10 @@
}

#handleAckEvents(ack: protocol.ToClientAckEvents) {
let originalTotalEvents = Array.from(this.#actors).reduce((s, [_, actor]) => s + actor.eventHistory.length, 0);

Check warning on line 948 in engine/sdks/typescript/runner/src/mod.ts

View workflow job for this annotation

GitHub Actions / quality

lint/style/useConst

This let declares a variable that is only assigned once.

for (const [_, actor] of this.#actors) {
let checkpoint = ack.lastEventCheckpoints.find(x => x.actorId == actor.actorId);

Check failure on line 951 in engine/sdks/typescript/runner/src/mod.ts

View workflow job for this annotation

GitHub Actions / quality

lint/suspicious/noDoubleEquals

Using == may be unsafe if you are relying on type coercion.

Check warning on line 951 in engine/sdks/typescript/runner/src/mod.ts

View workflow job for this annotation

GitHub Actions / quality

lint/style/useConst

This let declares a variable that is only assigned once.

if (checkpoint) actor.handleAckEvents(checkpoint.index);
}
Expand All @@ -960,7 +975,7 @@

actor.recordEvent(eventWrapper);

let totalEvents = Array.from(this.#actors).reduce((s, [_, actor]) => s + actor.eventHistory.length, 0);

Check warning on line 978 in engine/sdks/typescript/runner/src/mod.ts

View workflow job for this annotation

GitHub Actions / quality

lint/style/useConst

This let declares a variable that is only assigned once.

if (
totalEvents > EVENT_BACKLOG_WARN_THRESHOLD &&
Expand Down
7 changes: 2 additions & 5 deletions examples/counter/scripts/connect.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 3 additions & 25 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rivetkit-asyncapi/asyncapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"asyncapi": "3.0.0",
"info": {
"title": "RivetKit WebSocket Protocol",
"version": "2.0.25-rc.2",
"version": "2.0.25",
"description": "WebSocket protocol for bidirectional communication between RivetKit clients and actors"
},
"channels": {
Expand Down
4 changes: 2 additions & 2 deletions rivetkit-openapi/openapi.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"openapi": "3.0.0",
"info": {
"version": "2.0.25-rc.2",
"version": "2.0.25",
"title": "RivetKit API"
},
"components": {
Expand Down Expand Up @@ -666,4 +666,4 @@
}
}
}
}
}
Loading
Loading