Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion engine/packages/actor-kv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ pub async fn put(
&EntryMetadataKey::new(key.clone()),
rp::KvMetadata {
version: VERSION.as_bytes().to_vec(),
create_ts: utils::now(),
update_ts: utils::now(),
},
)?;

Expand Down
4 changes: 1 addition & 3 deletions engine/packages/api-peer/src/actors/kv_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ pub async fn kv_get(ctx: ApiCtx, path: KvGetPath, _query: KvGetQuery) -> Result<

Ok(KvGetResponse {
value: value_base64,
// NOTE: Intentionally uses different name in public API. `create_ts` is actually
// `update_ts`.
update_ts: metadata[0].create_ts,
update_ts: metadata[0].update_ts,
})
}
52 changes: 32 additions & 20 deletions engine/packages/pegboard-runner/src/actor_event_demuxer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,29 +37,20 @@ impl ActorEventDemuxer {
pub fn ingest(&mut self, actor_id: Id, event: protocol::mk2::EventWrapper) {
tracing::debug!(runner_id=?self.runner_id, ?actor_id, index=?event.checkpoint.index, "actor demuxer ingest");

if let Some(channel) = self.channels.get(&actor_id) {
if let Some(channel) = self.channels.get_mut(&actor_id) {
let _ = channel.tx.send(event);
channel.last_seen = Instant::now();
} else {
let (tx, mut rx) = mpsc::unbounded_channel();

let ctx = self.ctx.clone();
let runner_id = self.runner_id;
let handle = tokio::spawn(async move {
loop {
let mut buffer = Vec::new();

// Batch process events
if rx.recv_many(&mut buffer, 1024).await == 0 {
break;
}

if let Err(err) = dispatch_events(&ctx, runner_id, actor_id, buffer).await {
tracing::error!(?err, "actor event processor failed");
break;
}
}
});
let (tx, rx) = mpsc::unbounded_channel();

let handle = tokio::spawn(channel_handler(
self.ctx.clone(),
self.runner_id,
actor_id,
rx,
));

// Send initial event
let _ = tx.send(event);

self.channels.insert(
Expand Down Expand Up @@ -110,6 +101,27 @@ impl ActorEventDemuxer {
}
}

async fn channel_handler(
ctx: StandaloneCtx,
runner_id: Id,
actor_id: Id,
mut rx: mpsc::UnboundedReceiver<protocol::mk2::EventWrapper>,
) {
loop {
let mut buffer = Vec::new();

// Batch process events
if rx.recv_many(&mut buffer, 1024).await == 0 {
break;
}

if let Err(err) = dispatch_events(&ctx, runner_id, actor_id, buffer).await {
tracing::error!(?err, "actor event processor failed");
break;
}
}
}

#[tracing::instrument(skip_all, fields(?runner_id, ?actor_id))]
async fn dispatch_events(
ctx: &StandaloneCtx,
Expand Down
4 changes: 2 additions & 2 deletions engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ async fn handle_message_mk1(ctx: &StandaloneCtx, conn: &Conn, msg: Bytes) -> Res
.into_iter()
.map(|x| protocol::KvMetadata {
version: x.version,
create_ts: x.create_ts,
create_ts: x.update_ts,
})
.collect(),
},
Expand Down Expand Up @@ -626,7 +626,7 @@ async fn handle_message_mk1(ctx: &StandaloneCtx, conn: &Conn, msg: Bytes) -> Res
.into_iter()
.map(|x| protocol::KvMetadata {
version: x.version,
create_ts: x.create_ts,
create_ts: x.update_ts,
})
.collect(),
},
Expand Down
6 changes: 3 additions & 3 deletions engine/packages/pegboard/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ pub fn registry() -> WorkflowResult<Registry> {
registry.register_workflow::<actor::Workflow>()?;
registry.register_workflow::<runner::Workflow>()?;
registry.register_workflow::<runner2::Workflow>()?;
registry.register_workflow::<serverless::pool::Workflow>()?;
registry.register_workflow::<serverless::runner::Workflow>()?;
registry.register_workflow::<serverless::connection::Workflow>()?;
registry.register_workflow::<runner_pool::Workflow>()?;
registry.register_workflow::<serverless::conn::Workflow>()?;
registry.register_workflow::<serverless::receiver::Workflow>()?;

Ok(registry)
}
16 changes: 8 additions & 8 deletions engine/packages/pegboard/src/ops/runner_config/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub struct Input {

#[operation]
pub async fn pegboard_runner_config_delete(ctx: &OperationCtx, input: &Input) -> Result<()> {
let delete_workflow = ctx
let delete_pool = ctx
.udb()?
.run(|tx| async move {
let tx = tx.with_subspace(namespace::keys::subspace());
Expand All @@ -20,7 +20,7 @@ pub async fn pegboard_runner_config_delete(ctx: &OperationCtx, input: &Input) ->
let runner_config_key =
keys::runner_config::DataKey::new(input.namespace_id, input.name.clone());

let delete_workflow =
let delete_pool =
if let Some(config) = tx.read_opt(&runner_config_key, Serializable).await? {
tx.delete(&runner_config_key);

Expand All @@ -32,20 +32,20 @@ pub async fn pegboard_runner_config_delete(ctx: &OperationCtx, input: &Input) ->
input.name.clone(),
));

config.affects_autoscaler()
config.affects_pool()
} else {
false
};

Ok(delete_workflow)
Ok(delete_pool)
})
.custom_instrument(tracing::info_span!("runner_config_delete_tx"))
.await?;

// Bump autoscaler when a serverless config is modified
if delete_workflow {
ctx.signal(crate::workflows::serverless::pool::Bump {})
.to_workflow::<crate::workflows::serverless::pool::Workflow>()
// Bump pool when a serverless config is modified
if delete_pool {
ctx.signal(crate::workflows::runner_pool::Bump {})
.to_workflow::<crate::workflows::runner_pool::Workflow>()
.tag("namespace_id", input.namespace_id)
.tag("runner_name", input.name.clone())
.send()
Expand Down
25 changes: 10 additions & 15 deletions engine/packages/pegboard/src/ops/runner_config/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub struct Input {

struct UpsertOutput {
endpoint_config_changed: bool,
serverless_runner_created: bool,
pool_created: bool,
}

#[operation]
Expand Down Expand Up @@ -52,31 +52,28 @@ pub async fn pegboard_runner_config_upsert(ctx: &OperationCtx, input: &Input) ->
},
) => UpsertOutput {
endpoint_config_changed: old_url != new_url || old_headers != new_headers,
serverless_runner_created: false,
pool_created: false,
},
(RunnerConfigKind::Normal { .. }, RunnerConfigKind::Serverless { .. }) => {
// Config type changed to serverless
UpsertOutput {
endpoint_config_changed: true,
serverless_runner_created: true,
pool_created: true,
}
}
_ => {
// Not serverless
UpsertOutput {
endpoint_config_changed: true,
serverless_runner_created: false,
pool_created: false,
}
}
}
} else {
// New config
UpsertOutput {
endpoint_config_changed: true,
serverless_runner_created: matches!(
input.config.kind,
RunnerConfigKind::Serverless { .. }
),
pool_created: matches!(input.config.kind, RunnerConfigKind::Serverless { .. }),
}
};

Expand Down Expand Up @@ -161,9 +158,8 @@ pub async fn pegboard_runner_config_upsert(ctx: &OperationCtx, input: &Input) ->
.await?
.map_err(|err| err.build())?;

// Bump autoscaler
if res.serverless_runner_created {
ctx.workflow(crate::workflows::serverless::pool::Input {
if res.pool_created {
ctx.workflow(crate::workflows::runner_pool::Input {
namespace_id: input.namespace_id,
runner_name: input.name.clone(),
})
Expand All @@ -172,10 +168,9 @@ pub async fn pegboard_runner_config_upsert(ctx: &OperationCtx, input: &Input) ->
.unique()
.dispatch()
.await?;
} else if input.config.affects_autoscaler() {
// Maybe scale it
ctx.signal(crate::workflows::serverless::pool::Bump {})
.to_workflow::<crate::workflows::serverless::pool::Workflow>()
} else if input.config.affects_pool() {
ctx.signal(crate::workflows::runner_pool::Bump {})
.to_workflow::<crate::workflows::runner_pool::Workflow>()
.tag("namespace_id", input.namespace_id)
.tag("runner_name", input.name.clone())
.send()
Expand Down
6 changes: 3 additions & 3 deletions engine/packages/pegboard/src/workflows/actor/destroy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ pub(crate) async fn pegboard_actor_destroy(ctx: &mut WorkflowCtx, input: &Input)
})
.await?;

// If a slot was allocated at the time of actor destruction then bump the serverless autoscaler so it can scale down
// If a slot was allocated at the time of actor destruction then bump the runner pool so it can scale down
// if needed
if res.allocated_serverless_slot {
ctx.removed::<Message<super::BumpServerlessAutoscalerStub>>()
.await?;

let bump_res = ctx
.v(2)
.signal(crate::workflows::serverless::pool::Bump {})
.to_workflow::<crate::workflows::serverless::pool::Workflow>()
.signal(crate::workflows::runner_pool::Bump {})
.to_workflow::<crate::workflows::runner_pool::Workflow>()
.tag("namespace_id", input.namespace_id)
.tag("runner_name", res.runner_name_selector.clone())
.send()
Expand Down
6 changes: 3 additions & 3 deletions engine/packages/pegboard/src/workflows/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,15 +842,15 @@ async fn handle_stopped(
.await?;

if allocate_pending_res.allocations.is_empty() {
// Bump autoscaler so it can scale down if needed
// Bump pool so it can scale down if needed
if deallocate_res.for_serverless {
ctx.removed::<Message<BumpServerlessAutoscalerStub>>()
.await?;

let res = ctx
.v(2)
.signal(crate::workflows::serverless::pool::Bump {})
.to_workflow::<crate::workflows::serverless::pool::Workflow>()
.signal(crate::workflows::runner_pool::Bump {})
.to_workflow::<crate::workflows::runner_pool::Workflow>()
.tag("namespace_id", input.namespace_id)
.tag("runner_name", input.runner_name_selector.clone())
.send()
Expand Down
12 changes: 6 additions & 6 deletions engine/packages/pegboard/src/workflows/actor/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,12 +637,12 @@ pub async fn spawn_actor(
ctx.removed::<Message<super::BumpServerlessAutoscalerStub>>()
.await?;

// Bump the autoscaler so it can scale up
// Bump the pool so it can scale up
if allocate_res.serverless {
let res = ctx
.v(2)
.signal(crate::workflows::serverless::pool::Bump {})
.to_workflow::<crate::workflows::serverless::pool::Workflow>()
.signal(crate::workflows::runner_pool::Bump {})
.to_workflow::<crate::workflows::runner_pool::Workflow>()
.tag("namespace_id", input.namespace_id)
.tag("runner_name", input.runner_name_selector.clone())
.send()
Expand Down Expand Up @@ -728,12 +728,12 @@ pub async fn spawn_actor(
ctx.removed::<Message<super::BumpServerlessAutoscalerStub>>()
.await?;

// Bump the autoscaler so it can scale up
// Bump the pool so it can scale up
if allocate_res.serverless {
let res = ctx
.v(2)
.signal(crate::workflows::serverless::pool::Bump {})
.to_workflow::<crate::workflows::serverless::pool::Workflow>()
.signal(crate::workflows::runner_pool::Bump {})
.to_workflow::<crate::workflows::runner_pool::Workflow>()
.tag("namespace_id", input.namespace_id)
.tag("runner_name", input.runner_name_selector.clone())
.send()
Expand Down
1 change: 1 addition & 0 deletions engine/packages/pegboard/src/workflows/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod actor;
pub mod runner;
pub mod runner2;
pub mod runner_pool;
pub mod serverless;
3 changes: 2 additions & 1 deletion engine/packages/pegboard/src/workflows/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
.await?;
}

// Check for pending actors (which happen when there is not enough runner capacity)
let res = ctx
.activity(AllocatePendingActorsInput {
namespace_id: input.namespace_id,
Expand Down Expand Up @@ -282,7 +283,7 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
}
}
Some(Main::CheckQueue(_)) => {
// Check for pending actors
// Check for pending actors (which happen when there is not enough runner capacity)
let res = ctx
.activity(AllocatePendingActorsInput {
namespace_id: input.namespace_id,
Expand Down
6 changes: 2 additions & 4 deletions engine/packages/pegboard/src/workflows/runner2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ pub async fn pegboard_runner2(ctx: &mut WorkflowCtx, input: &Input) -> Result<()
})
.await?;

// NOTE: This is only if there's a queue of actors. this path is not used if there is enough capacity of
// runners, the actor wf allocates itself independently
// Check for pending actors
// Check for pending actors (which happen when there is not enough runner capacity)
let res = ctx
.activity(AllocatePendingActorsInput {
namespace_id: input.namespace_id,
Expand Down Expand Up @@ -90,7 +88,7 @@ pub async fn pegboard_runner2(ctx: &mut WorkflowCtx, input: &Input) -> Result<()
}
}
Some(Main::CheckQueue(_)) => {
// Check for pending actors
// Check for pending actors (which happen when there is not enough runner capacity)
let res = ctx
.activity(AllocatePendingActorsInput {
namespace_id: input.namespace_id,
Expand Down
Loading
Loading