diff --git a/engine/packages/actor-kv/src/lib.rs b/engine/packages/actor-kv/src/lib.rs index 46e9735bfa..003daf6e01 100644 --- a/engine/packages/actor-kv/src/lib.rs +++ b/engine/packages/actor-kv/src/lib.rs @@ -260,7 +260,7 @@ pub async fn put( &EntryMetadataKey::new(key.clone()), rp::KvMetadata { version: VERSION.as_bytes().to_vec(), - create_ts: utils::now(), + update_ts: utils::now(), }, )?; diff --git a/engine/packages/api-peer/src/actors/kv_get.rs b/engine/packages/api-peer/src/actors/kv_get.rs index 7f81aa7563..00b61c9540 100644 --- a/engine/packages/api-peer/src/actors/kv_get.rs +++ b/engine/packages/api-peer/src/actors/kv_get.rs @@ -60,8 +60,6 @@ pub async fn kv_get(ctx: ApiCtx, path: KvGetPath, _query: KvGetQuery) -> Result< Ok(KvGetResponse { value: value_base64, - // NOTE: Intentionally uses different name in public API. `create_ts` is actually - // `update_ts`. - update_ts: metadata[0].create_ts, + update_ts: metadata[0].update_ts, }) } diff --git a/engine/packages/pegboard-runner/src/actor_event_demuxer.rs b/engine/packages/pegboard-runner/src/actor_event_demuxer.rs index 8dadd18916..2b5c7c743f 100644 --- a/engine/packages/pegboard-runner/src/actor_event_demuxer.rs +++ b/engine/packages/pegboard-runner/src/actor_event_demuxer.rs @@ -37,29 +37,20 @@ impl ActorEventDemuxer { pub fn ingest(&mut self, actor_id: Id, event: protocol::mk2::EventWrapper) { tracing::debug!(runner_id=?self.runner_id, ?actor_id, index=?event.checkpoint.index, "actor demuxer ingest"); - if let Some(channel) = self.channels.get(&actor_id) { + if let Some(channel) = self.channels.get_mut(&actor_id) { let _ = channel.tx.send(event); + channel.last_seen = Instant::now(); } else { - let (tx, mut rx) = mpsc::unbounded_channel(); - - let ctx = self.ctx.clone(); - let runner_id = self.runner_id; - let handle = tokio::spawn(async move { - loop { - let mut buffer = Vec::new(); - - // Batch process events - if rx.recv_many(&mut buffer, 1024).await == 0 { - break; - } - - if let Err(err) = dispatch_events(&ctx, runner_id, actor_id, buffer).await { - tracing::error!(?err, "actor event processor failed"); - break; - } - } - }); + let (tx, rx) = mpsc::unbounded_channel(); + + let handle = tokio::spawn(channel_handler( + self.ctx.clone(), + self.runner_id, + actor_id, + rx, + )); + // Send initial event let _ = tx.send(event); self.channels.insert( @@ -110,6 +101,27 @@ impl ActorEventDemuxer { } } +async fn channel_handler( + ctx: StandaloneCtx, + runner_id: Id, + actor_id: Id, + mut rx: mpsc::UnboundedReceiver, +) { + loop { + let mut buffer = Vec::new(); + + // Batch process events + if rx.recv_many(&mut buffer, 1024).await == 0 { + break; + } + + if let Err(err) = dispatch_events(&ctx, runner_id, actor_id, buffer).await { + tracing::error!(?err, "actor event processor failed"); + break; + } + } +} + #[tracing::instrument(skip_all, fields(?runner_id, ?actor_id))] async fn dispatch_events( ctx: &StandaloneCtx, diff --git a/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs b/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs index 261f02c2c2..3b2fcca0ea 100644 --- a/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs +++ b/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs @@ -558,7 +558,7 @@ async fn handle_message_mk1(ctx: &StandaloneCtx, conn: &Conn, msg: Bytes) -> Res .into_iter() .map(|x| protocol::KvMetadata { version: x.version, - create_ts: x.create_ts, + create_ts: x.update_ts, }) .collect(), }, @@ -626,7 +626,7 @@ async fn handle_message_mk1(ctx: &StandaloneCtx, conn: &Conn, msg: Bytes) -> Res .into_iter() .map(|x| protocol::KvMetadata { version: x.version, - create_ts: x.create_ts, + create_ts: x.update_ts, }) .collect(), }, diff --git a/engine/packages/pegboard/src/lib.rs b/engine/packages/pegboard/src/lib.rs index a64263687b..59e107384a 100644 --- a/engine/packages/pegboard/src/lib.rs +++ b/engine/packages/pegboard/src/lib.rs @@ -15,9 +15,9 @@ pub fn registry() -> WorkflowResult { registry.register_workflow::()?; registry.register_workflow::()?; registry.register_workflow::()?; - registry.register_workflow::()?; - registry.register_workflow::()?; - registry.register_workflow::()?; + registry.register_workflow::()?; + registry.register_workflow::()?; + registry.register_workflow::()?; Ok(registry) } diff --git a/engine/packages/pegboard/src/ops/runner_config/delete.rs b/engine/packages/pegboard/src/ops/runner_config/delete.rs index ae78039ffd..7c19fc0ca4 100644 --- a/engine/packages/pegboard/src/ops/runner_config/delete.rs +++ b/engine/packages/pegboard/src/ops/runner_config/delete.rs @@ -11,7 +11,7 @@ pub struct Input { #[operation] pub async fn pegboard_runner_config_delete(ctx: &OperationCtx, input: &Input) -> Result<()> { - let delete_workflow = ctx + let delete_pool = ctx .udb()? .run(|tx| async move { let tx = tx.with_subspace(namespace::keys::subspace()); @@ -20,7 +20,7 @@ pub async fn pegboard_runner_config_delete(ctx: &OperationCtx, input: &Input) -> let runner_config_key = keys::runner_config::DataKey::new(input.namespace_id, input.name.clone()); - let delete_workflow = + let delete_pool = if let Some(config) = tx.read_opt(&runner_config_key, Serializable).await? { tx.delete(&runner_config_key); @@ -32,20 +32,20 @@ pub async fn pegboard_runner_config_delete(ctx: &OperationCtx, input: &Input) -> input.name.clone(), )); - config.affects_autoscaler() + config.affects_pool() } else { false }; - Ok(delete_workflow) + Ok(delete_pool) }) .custom_instrument(tracing::info_span!("runner_config_delete_tx")) .await?; - // Bump autoscaler when a serverless config is modified - if delete_workflow { - ctx.signal(crate::workflows::serverless::pool::Bump {}) - .to_workflow::() + // Bump pool when a serverless config is modified + if delete_pool { + ctx.signal(crate::workflows::runner_pool::Bump {}) + .to_workflow::() .tag("namespace_id", input.namespace_id) .tag("runner_name", input.name.clone()) .send() diff --git a/engine/packages/pegboard/src/ops/runner_config/upsert.rs b/engine/packages/pegboard/src/ops/runner_config/upsert.rs index dc5235cf56..9c4f00ad72 100644 --- a/engine/packages/pegboard/src/ops/runner_config/upsert.rs +++ b/engine/packages/pegboard/src/ops/runner_config/upsert.rs @@ -13,7 +13,7 @@ pub struct Input { struct UpsertOutput { endpoint_config_changed: bool, - serverless_runner_created: bool, + pool_created: bool, } #[operation] @@ -52,20 +52,20 @@ pub async fn pegboard_runner_config_upsert(ctx: &OperationCtx, input: &Input) -> }, ) => UpsertOutput { endpoint_config_changed: old_url != new_url || old_headers != new_headers, - serverless_runner_created: false, + pool_created: false, }, (RunnerConfigKind::Normal { .. }, RunnerConfigKind::Serverless { .. }) => { // Config type changed to serverless UpsertOutput { endpoint_config_changed: true, - serverless_runner_created: true, + pool_created: true, } } _ => { // Not serverless UpsertOutput { endpoint_config_changed: true, - serverless_runner_created: false, + pool_created: false, } } } @@ -73,10 +73,7 @@ pub async fn pegboard_runner_config_upsert(ctx: &OperationCtx, input: &Input) -> // New config UpsertOutput { endpoint_config_changed: true, - serverless_runner_created: matches!( - input.config.kind, - RunnerConfigKind::Serverless { .. } - ), + pool_created: matches!(input.config.kind, RunnerConfigKind::Serverless { .. }), } }; @@ -161,9 +158,8 @@ pub async fn pegboard_runner_config_upsert(ctx: &OperationCtx, input: &Input) -> .await? .map_err(|err| err.build())?; - // Bump autoscaler - if res.serverless_runner_created { - ctx.workflow(crate::workflows::serverless::pool::Input { + if res.pool_created { + ctx.workflow(crate::workflows::runner_pool::Input { namespace_id: input.namespace_id, runner_name: input.name.clone(), }) @@ -172,10 +168,9 @@ pub async fn pegboard_runner_config_upsert(ctx: &OperationCtx, input: &Input) -> .unique() .dispatch() .await?; - } else if input.config.affects_autoscaler() { - // Maybe scale it - ctx.signal(crate::workflows::serverless::pool::Bump {}) - .to_workflow::() + } else if input.config.affects_pool() { + ctx.signal(crate::workflows::runner_pool::Bump {}) + .to_workflow::() .tag("namespace_id", input.namespace_id) .tag("runner_name", input.name.clone()) .send() diff --git a/engine/packages/pegboard/src/workflows/actor/destroy.rs b/engine/packages/pegboard/src/workflows/actor/destroy.rs index 1c1be3deda..d73493ed20 100644 --- a/engine/packages/pegboard/src/workflows/actor/destroy.rs +++ b/engine/packages/pegboard/src/workflows/actor/destroy.rs @@ -30,7 +30,7 @@ pub(crate) async fn pegboard_actor_destroy(ctx: &mut WorkflowCtx, input: &Input) }) .await?; - // If a slot was allocated at the time of actor destruction then bump the serverless autoscaler so it can scale down + // If a slot was allocated at the time of actor destruction then bump the runner pool so it can scale down // if needed if res.allocated_serverless_slot { ctx.removed::>() @@ -38,8 +38,8 @@ pub(crate) async fn pegboard_actor_destroy(ctx: &mut WorkflowCtx, input: &Input) let bump_res = ctx .v(2) - .signal(crate::workflows::serverless::pool::Bump {}) - .to_workflow::() + .signal(crate::workflows::runner_pool::Bump {}) + .to_workflow::() .tag("namespace_id", input.namespace_id) .tag("runner_name", res.runner_name_selector.clone()) .send() diff --git a/engine/packages/pegboard/src/workflows/actor/mod.rs b/engine/packages/pegboard/src/workflows/actor/mod.rs index a9fa2f5bc2..37a9e209bf 100644 --- a/engine/packages/pegboard/src/workflows/actor/mod.rs +++ b/engine/packages/pegboard/src/workflows/actor/mod.rs @@ -842,15 +842,15 @@ async fn handle_stopped( .await?; if allocate_pending_res.allocations.is_empty() { - // Bump autoscaler so it can scale down if needed + // Bump pool so it can scale down if needed if deallocate_res.for_serverless { ctx.removed::>() .await?; let res = ctx .v(2) - .signal(crate::workflows::serverless::pool::Bump {}) - .to_workflow::() + .signal(crate::workflows::runner_pool::Bump {}) + .to_workflow::() .tag("namespace_id", input.namespace_id) .tag("runner_name", input.runner_name_selector.clone()) .send() diff --git a/engine/packages/pegboard/src/workflows/actor/runtime.rs b/engine/packages/pegboard/src/workflows/actor/runtime.rs index 79316bb922..268d3fcde8 100644 --- a/engine/packages/pegboard/src/workflows/actor/runtime.rs +++ b/engine/packages/pegboard/src/workflows/actor/runtime.rs @@ -637,12 +637,12 @@ pub async fn spawn_actor( ctx.removed::>() .await?; - // Bump the autoscaler so it can scale up + // Bump the pool so it can scale up if allocate_res.serverless { let res = ctx .v(2) - .signal(crate::workflows::serverless::pool::Bump {}) - .to_workflow::() + .signal(crate::workflows::runner_pool::Bump {}) + .to_workflow::() .tag("namespace_id", input.namespace_id) .tag("runner_name", input.runner_name_selector.clone()) .send() @@ -728,12 +728,12 @@ pub async fn spawn_actor( ctx.removed::>() .await?; - // Bump the autoscaler so it can scale up + // Bump the pool so it can scale up if allocate_res.serverless { let res = ctx .v(2) - .signal(crate::workflows::serverless::pool::Bump {}) - .to_workflow::() + .signal(crate::workflows::runner_pool::Bump {}) + .to_workflow::() .tag("namespace_id", input.namespace_id) .tag("runner_name", input.runner_name_selector.clone()) .send() diff --git a/engine/packages/pegboard/src/workflows/mod.rs b/engine/packages/pegboard/src/workflows/mod.rs index e8f47492b6..0c5741eb66 100644 --- a/engine/packages/pegboard/src/workflows/mod.rs +++ b/engine/packages/pegboard/src/workflows/mod.rs @@ -1,4 +1,5 @@ pub mod actor; pub mod runner; pub mod runner2; +pub mod runner_pool; pub mod serverless; diff --git a/engine/packages/pegboard/src/workflows/runner.rs b/engine/packages/pegboard/src/workflows/runner.rs index 7d665b480f..4d06a657b5 100644 --- a/engine/packages/pegboard/src/workflows/runner.rs +++ b/engine/packages/pegboard/src/workflows/runner.rs @@ -131,6 +131,7 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> .await?; } + // Check for pending actors (which happen when there is not enough runner capacity) let res = ctx .activity(AllocatePendingActorsInput { namespace_id: input.namespace_id, @@ -282,7 +283,7 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> } } Some(Main::CheckQueue(_)) => { - // Check for pending actors + // Check for pending actors (which happen when there is not enough runner capacity) let res = ctx .activity(AllocatePendingActorsInput { namespace_id: input.namespace_id, diff --git a/engine/packages/pegboard/src/workflows/runner2.rs b/engine/packages/pegboard/src/workflows/runner2.rs index 135d3d2056..fe39291caa 100644 --- a/engine/packages/pegboard/src/workflows/runner2.rs +++ b/engine/packages/pegboard/src/workflows/runner2.rs @@ -52,9 +52,7 @@ pub async fn pegboard_runner2(ctx: &mut WorkflowCtx, input: &Input) -> Result<() }) .await?; - // NOTE: This is only if there's a queue of actors. this path is not used if there is enough capacity of - // runners, the actor wf allocates itself independently - // Check for pending actors + // Check for pending actors (which happen when there is not enough runner capacity) let res = ctx .activity(AllocatePendingActorsInput { namespace_id: input.namespace_id, @@ -90,7 +88,7 @@ pub async fn pegboard_runner2(ctx: &mut WorkflowCtx, input: &Input) -> Result<() } } Some(Main::CheckQueue(_)) => { - // Check for pending actors + // Check for pending actors (which happen when there is not enough runner capacity) let res = ctx .activity(AllocatePendingActorsInput { namespace_id: input.namespace_id, diff --git a/engine/packages/pegboard/src/workflows/serverless/pool.rs b/engine/packages/pegboard/src/workflows/runner_pool.rs similarity index 67% rename from engine/packages/pegboard/src/workflows/serverless/pool.rs rename to engine/packages/pegboard/src/workflows/runner_pool.rs index ec657c0744..d1bb91120c 100644 --- a/engine/packages/pegboard/src/workflows/serverless/pool.rs +++ b/engine/packages/pegboard/src/workflows/runner_pool.rs @@ -1,10 +1,10 @@ use std::hash::{DefaultHasher, Hash, Hasher}; use futures_util::FutureExt; -use gas::{db::WorkflowData, prelude::*}; +use gas::prelude::*; use rivet_types::{keys, runner_configs::RunnerConfigKind}; -use super::runner; +use super::serverless; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Input { @@ -19,13 +19,12 @@ struct LifecycleState { #[derive(Debug, Serialize, Deserialize)] struct RunnerState { - /// Serverless runner wf id, not normal runner wf id. - runner_wf_id: Id, + receiver_wf_id: Id, details_hash: u64, } #[workflow] -pub async fn pegboard_serverless_pool(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> { +pub async fn pegboard_runner_pool(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> { ctx.loope(LifecycleState::default(), |ctx, state| { let input = input.clone(); async move { @@ -42,17 +41,6 @@ pub async fn pegboard_serverless_pool(ctx: &mut WorkflowCtx, input: &Input) -> R return Ok(Loop::Break(())); }; - let completed_runners = ctx - .activity(GetCompletedInput { - runners: state.runners.iter().map(|r| r.runner_wf_id).collect(), - }) - .await?; - - // Remove completed connections - state - .runners - .retain(|r| !completed_runners.contains(&r.runner_wf_id)); - // Remove runners that have an outdated hash. This is done outside of the below draining mechanism // because we drain specific runners, not just a number of runners let (new, outdated) = std::mem::take(&mut state.runners) @@ -61,8 +49,9 @@ pub async fn pegboard_serverless_pool(ctx: &mut WorkflowCtx, input: &Input) -> R state.runners = new; for runner in outdated { - ctx.signal(runner::Drain {}) - .to_workflow_id(runner.runner_wf_id) + // TODO: Spawn sub wf to process these so this is not blocking the loop + ctx.signal(serverless::receiver::Drain {}) + .to_workflow_id(runner.receiver_wf_id) .send() .await?; } @@ -75,9 +64,10 @@ pub async fn pegboard_serverless_pool(ctx: &mut WorkflowCtx, input: &Input) -> R // TODO: Implement smart logic of draining runners with the lowest allocated actors let draining_runners = state.runners.iter().take(drain_count).collect::>(); + // TODO: Spawn sub wf to process these so this is not blocking the loop for runner in draining_runners { - ctx.signal(runner::Drain {}) - .to_workflow_id(runner.runner_wf_id) + ctx.signal(serverless::receiver::Drain {}) + .to_workflow_id(runner.receiver_wf_id) .send() .await?; } @@ -85,9 +75,10 @@ pub async fn pegboard_serverless_pool(ctx: &mut WorkflowCtx, input: &Input) -> R // Dispatch new runner workflows if start_count != 0 { + // TODO: Spawn sub wf to process these so this is not blocking the loop for _ in 0..start_count { - let runner_wf_id = ctx - .workflow(runner::Input { + let receiver_wf_id = ctx + .workflow(serverless::receiver::Input { pool_wf_id: ctx.workflow_id(), namespace_id: input.namespace_id, runner_name: input.runner_name.clone(), @@ -98,18 +89,22 @@ pub async fn pegboard_serverless_pool(ctx: &mut WorkflowCtx, input: &Input) -> R .await?; state.runners.push(RunnerState { - runner_wf_id, + receiver_wf_id, details_hash, }); } } - // Wait for Bump or runner update signals until we tick again - match ctx.listen::
().await? { - Main::RunnerDrainStarted(sig) => { - state.runners.retain(|r| r.runner_wf_id != sig.runner_wf_id); + // Wait for Bump or serverless signals until we tick again + for sig in ctx.listen_n::
(1024).await? { + match sig { + Main::OutboundConnDrainStarted(sig) => { + state + .runners + .retain(|r| r.receiver_wf_id != sig.receiver_wf_id); + } + Main::Bump(_) => {} } - Main::Bump(_) => {} } Ok(Loop::Continue) @@ -121,23 +116,6 @@ pub async fn pegboard_serverless_pool(ctx: &mut WorkflowCtx, input: &Input) -> R Ok(()) } -#[derive(Debug, Serialize, Deserialize, Hash)] -struct GetCompletedInput { - runners: Vec, -} - -#[activity(GetCompleted)] -async fn get_completed(ctx: &ActivityCtx, input: &GetCompletedInput) -> Result> { - Ok(ctx - .get_workflows(input.runners.clone()) - .await? - .into_iter() - // When a workflow has output, it means it has completed - .filter(WorkflowData::has_output) - .map(|wf| wf.workflow_id) - .collect()) -} - #[derive(Debug, Serialize, Deserialize, Hash)] struct ReadDesiredInput { namespace_id: Id, @@ -155,12 +133,28 @@ enum ReadDesiredOutput { #[activity(ReadDesired)] async fn read_desired(ctx: &ActivityCtx, input: &ReadDesiredInput) -> Result { - let runner_config_res = ctx - .op(crate::ops::runner_config::get::Input { + let udb_pool = ctx.udb()?; + let (runner_config_res, desired_slots) = tokio::try_join!( + ctx.op(crate::ops::runner_config::get::Input { runners: vec![(input.namespace_id, input.runner_name.clone())], bypass_cache: false, - }) - .await?; + }), + udb_pool.run(|tx| async move { + let tx = tx.with_subspace(keys::pegboard::subspace()); + + let desired_slots = tx + .read_opt( + &keys::pegboard::ns::ServerlessDesiredSlotsKey { + namespace_id: input.namespace_id, + runner_name: input.runner_name.clone(), + }, + universaldb::utils::IsolationLevel::Serializable, + ) + .await?; + + Ok(desired_slots.unwrap_or_default()) + }), + )?; let Some(runner_config) = runner_config_res.into_iter().next() else { return Ok(ReadDesiredOutput::Stop); }; @@ -179,22 +173,6 @@ async fn read_desired(ctx: &ActivityCtx, input: &ReadDesiredInput) -> Result Result Result<()> { +pub async fn pegboard_serverless_conn(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> { // Run the connection activity, which will handle the full lifecycle let drain_sent = ctx .loope(RescheduleState::default(), |ctx, state| { @@ -49,7 +49,7 @@ pub async fn pegboard_serverless_connection(ctx: &mut WorkflowCtx, input: &Input let res = ctx .activity(OutboundReqInput { pool_wf_id: input.pool_wf_id, - runner_wf_id: input.runner_wf_id, + receiver_wf_id: input.receiver_wf_id, namespace_id: input.namespace_id, runner_name: input.runner_name.clone(), }) @@ -99,8 +99,8 @@ pub async fn pegboard_serverless_connection(ctx: &mut WorkflowCtx, input: &Input // If we failed to send inline during the activity, durably ensure the // signal is dispatched here if !drain_sent { - ctx.signal(pool::RunnerDrainStarted { - runner_wf_id: input.runner_wf_id, + ctx.signal(runner_pool::OutboundConnDrainStarted { + receiver_wf_id: input.receiver_wf_id, }) .to_workflow_id(input.pool_wf_id) .send() @@ -136,7 +136,7 @@ async fn compare_retry(ctx: &ActivityCtx, input: &CompareRetryInput) -> Result, ) -> Result { - if is_runner_draining(ctx, input.runner_wf_id).await? { + if is_runner_draining(ctx, input.receiver_wf_id).await? { return Ok(OutboundReqOutput::Draining { drain_sent: false }); } @@ -336,8 +336,8 @@ async fn outbound_req_inner( tracing::debug!(?runner_id, "connection reached lifespan, starting drain"); if let Err(err) = ctx - .signal(pool::RunnerDrainStarted { - runner_wf_id: input.runner_wf_id, + .signal(runner_pool::OutboundConnDrainStarted { + receiver_wf_id: input.receiver_wf_id, }) // This is ok, because we only send DrainStarted once .bypass_signal_from_workflow_I_KNOW_WHAT_IM_DOING() @@ -365,14 +365,14 @@ async fn outbound_req_inner( /// Reads from the adjacent serverless runner wf which is keeping track of signals while this workflow runs /// outbound requests. #[tracing::instrument(skip_all)] -async fn is_runner_draining(ctx: &ActivityCtx, runner_wf_id: Id) -> Result { - let runner_wf = ctx - .get_workflows(vec![runner_wf_id]) +async fn is_runner_draining(ctx: &ActivityCtx, receiver_wf_id: Id) -> Result { + let receiver_wf = ctx + .get_workflows(vec![receiver_wf_id]) .await? .into_iter() .next() .context("cannot find own runner wf")?; - let state = runner_wf.parse_state::()?; + let state = receiver_wf.parse_state::()?; Ok(state.is_draining) } @@ -513,8 +513,8 @@ async fn publish_to_client_stop( Ok(()) } -#[message("pegboard_serverless_connection_drain")] -#[signal("pegboard_serverless_connection_drain")] +#[message("pegboard_serverless_conn_drain")] +#[signal("pegboard_serverless_conn_drain")] pub struct Drain {} fn reconnect_backoff( diff --git a/engine/packages/pegboard/src/workflows/serverless/mod.rs b/engine/packages/pegboard/src/workflows/serverless/mod.rs index 6e37f560c3..605d08ba1b 100644 --- a/engine/packages/pegboard/src/workflows/serverless/mod.rs +++ b/engine/packages/pegboard/src/workflows/serverless/mod.rs @@ -1,3 +1,2 @@ -pub mod connection; -pub mod pool; -pub mod runner; +pub mod conn; +pub mod receiver; diff --git a/engine/packages/pegboard/src/workflows/serverless/runner.rs b/engine/packages/pegboard/src/workflows/serverless/receiver.rs similarity index 78% rename from engine/packages/pegboard/src/workflows/serverless/runner.rs rename to engine/packages/pegboard/src/workflows/serverless/receiver.rs index 55eab09f75..d96c310200 100644 --- a/engine/packages/pegboard/src/workflows/serverless/runner.rs +++ b/engine/packages/pegboard/src/workflows/serverless/receiver.rs @@ -1,6 +1,6 @@ use gas::prelude::*; -use super::connection; +use super::conn; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Input { @@ -23,13 +23,13 @@ impl State { /// Runs alongside the connection workflow to process signals. This is required because the connection /// workflow cannot listen for signals while in an activity. #[workflow] -pub async fn pegboard_serverless_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> { +pub async fn pegboard_serverless_receiver(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> { ctx.activity(InitStateInput {}).await?; let conn_wf_id = ctx - .workflow(connection::Input { + .workflow(conn::Input { pool_wf_id: input.pool_wf_id, - runner_wf_id: ctx.workflow_id(), + receiver_wf_id: ctx.workflow_id(), namespace_id: input.namespace_id, runner_name: input.runner_name.clone(), }) @@ -40,20 +40,20 @@ pub async fn pegboard_serverless_runner(ctx: &mut WorkflowCtx, input: &Input) -> ctx.activity(MarkAsDrainingInput {}).await?; - ctx.signal(connection::Drain {}) + // If the connection is between retries, this will be received + ctx.signal(conn::Drain {}) .to_workflow_id(conn_wf_id) .send() .await?; - ctx.msg(connection::Drain {}) + // if the connection is currently running an outbound req, this will be received + ctx.msg(conn::Drain {}) .tag("workflow_id", conn_wf_id) .send() .await?; // Wait for connection wf to complete so this wf's state remains readable - ctx.workflow::(conn_wf_id) - .output() - .await?; + ctx.workflow::(conn_wf_id).output().await?; Ok(()) } diff --git a/engine/packages/types/src/runner_configs.rs b/engine/packages/types/src/runner_configs.rs index 3cb4dd18f5..b7ba3fa4d3 100644 --- a/engine/packages/types/src/runner_configs.rs +++ b/engine/packages/types/src/runner_configs.rs @@ -89,8 +89,8 @@ impl From for R } impl RunnerConfig { - /// If updates to this run config affects the autoscaler. - pub fn affects_autoscaler(&self) -> bool { + /// If updates to this run config affects the pool. + pub fn affects_pool(&self) -> bool { matches!(self.kind, RunnerConfigKind::Serverless { .. }) } } diff --git a/engine/sdks/schemas/runner-protocol/v4.bare b/engine/sdks/schemas/runner-protocol/v4.bare index a91d1dfd00..1e85a45e95 100644 --- a/engine/sdks/schemas/runner-protocol/v4.bare +++ b/engine/sdks/schemas/runner-protocol/v4.bare @@ -16,8 +16,7 @@ type KvKey data type KvValue data type KvMetadata struct { version: data - # TODO: Rename to update_ts - createTs: i64 + updateTs: i64 } # Query types diff --git a/engine/sdks/typescript/runner-protocol/src/index.ts b/engine/sdks/typescript/runner-protocol/src/index.ts index 8a47f762d8..4bcdcc13ab 100644 --- a/engine/sdks/typescript/runner-protocol/src/index.ts +++ b/engine/sdks/typescript/runner-protocol/src/index.ts @@ -85,22 +85,19 @@ export function writeKvValue(bc: bare.ByteCursor, x: KvValue): void { export type KvMetadata = { readonly version: ArrayBuffer - /** - * TODO: Rename to update_ts - */ - readonly createTs: i64 + readonly updateTs: i64 } export function readKvMetadata(bc: bare.ByteCursor): KvMetadata { return { version: bare.readData(bc), - createTs: bare.readI64(bc), + updateTs: bare.readI64(bc), } } export function writeKvMetadata(bc: bare.ByteCursor, x: KvMetadata): void { bare.writeData(bc, x.version) - bare.writeI64(bc, x.createTs) + bare.writeI64(bc, x.updateTs) } /**