Skip to content

Commit 4019c28

Browse files
committed
fix(serverless): misc pr fixes (#3576)
1 parent a543cfc commit 4019c28

File tree

20 files changed

+155
-177
lines changed

20 files changed

+155
-177
lines changed

engine/packages/actor-kv/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ pub async fn put(
260260
&EntryMetadataKey::new(key.clone()),
261261
rp::KvMetadata {
262262
version: VERSION.as_bytes().to_vec(),
263-
create_ts: utils::now(),
263+
update_ts: utils::now(),
264264
},
265265
)?;
266266

engine/packages/api-peer/src/actors/kv_get.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,6 @@ pub async fn kv_get(ctx: ApiCtx, path: KvGetPath, _query: KvGetQuery) -> Result<
6060

6161
Ok(KvGetResponse {
6262
value: value_base64,
63-
// NOTE: Intentionally uses different name in public API. `create_ts` is actually
64-
// `update_ts`.
65-
update_ts: metadata[0].create_ts,
63+
update_ts: metadata[0].update_ts,
6664
})
6765
}

engine/packages/pegboard-runner/src/actor_event_demuxer.rs

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -37,29 +37,20 @@ impl ActorEventDemuxer {
3737
pub fn ingest(&mut self, actor_id: Id, event: protocol::mk2::EventWrapper) {
3838
tracing::debug!(runner_id=?self.runner_id, ?actor_id, index=?event.checkpoint.index, "actor demuxer ingest");
3939

40-
if let Some(channel) = self.channels.get(&actor_id) {
40+
if let Some(channel) = self.channels.get_mut(&actor_id) {
4141
let _ = channel.tx.send(event);
42+
channel.last_seen = Instant::now();
4243
} else {
43-
let (tx, mut rx) = mpsc::unbounded_channel();
44-
45-
let ctx = self.ctx.clone();
46-
let runner_id = self.runner_id;
47-
let handle = tokio::spawn(async move {
48-
loop {
49-
let mut buffer = Vec::new();
50-
51-
// Batch process events
52-
if rx.recv_many(&mut buffer, 1024).await == 0 {
53-
break;
54-
}
55-
56-
if let Err(err) = dispatch_events(&ctx, runner_id, actor_id, buffer).await {
57-
tracing::error!(?err, "actor event processor failed");
58-
break;
59-
}
60-
}
61-
});
44+
let (tx, rx) = mpsc::unbounded_channel();
45+
46+
let handle = tokio::spawn(channel_handler(
47+
self.ctx.clone(),
48+
self.runner_id,
49+
actor_id,
50+
rx,
51+
));
6252

53+
// Send initial event
6354
let _ = tx.send(event);
6455

6556
self.channels.insert(
@@ -110,6 +101,27 @@ impl ActorEventDemuxer {
110101
}
111102
}
112103

104+
async fn channel_handler(
105+
ctx: StandaloneCtx,
106+
runner_id: Id,
107+
actor_id: Id,
108+
mut rx: mpsc::UnboundedReceiver<protocol::mk2::EventWrapper>,
109+
) {
110+
loop {
111+
let mut buffer = Vec::new();
112+
113+
// Batch process events
114+
if rx.recv_many(&mut buffer, 1024).await == 0 {
115+
break;
116+
}
117+
118+
if let Err(err) = dispatch_events(&ctx, runner_id, actor_id, buffer).await {
119+
tracing::error!(?err, "actor event processor failed");
120+
break;
121+
}
122+
}
123+
}
124+
113125
#[tracing::instrument(skip_all, fields(?runner_id, ?actor_id))]
114126
async fn dispatch_events(
115127
ctx: &StandaloneCtx,

engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,7 @@ async fn handle_message_mk1(ctx: &StandaloneCtx, conn: &Conn, msg: Bytes) -> Res
558558
.into_iter()
559559
.map(|x| protocol::KvMetadata {
560560
version: x.version,
561-
create_ts: x.create_ts,
561+
create_ts: x.update_ts,
562562
})
563563
.collect(),
564564
},
@@ -626,7 +626,7 @@ async fn handle_message_mk1(ctx: &StandaloneCtx, conn: &Conn, msg: Bytes) -> Res
626626
.into_iter()
627627
.map(|x| protocol::KvMetadata {
628628
version: x.version,
629-
create_ts: x.create_ts,
629+
create_ts: x.update_ts,
630630
})
631631
.collect(),
632632
},

engine/packages/pegboard/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ pub fn registry() -> WorkflowResult<Registry> {
1515
registry.register_workflow::<actor::Workflow>()?;
1616
registry.register_workflow::<runner::Workflow>()?;
1717
registry.register_workflow::<runner2::Workflow>()?;
18-
registry.register_workflow::<serverless::pool::Workflow>()?;
19-
registry.register_workflow::<serverless::runner::Workflow>()?;
20-
registry.register_workflow::<serverless::connection::Workflow>()?;
18+
registry.register_workflow::<runner_pool::Workflow>()?;
19+
registry.register_workflow::<serverless::conn::Workflow>()?;
20+
registry.register_workflow::<serverless::receiver::Workflow>()?;
2121

2222
Ok(registry)
2323
}

engine/packages/pegboard/src/ops/runner_config/delete.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ pub struct Input {
1111

1212
#[operation]
1313
pub async fn pegboard_runner_config_delete(ctx: &OperationCtx, input: &Input) -> Result<()> {
14-
let delete_workflow = ctx
14+
let delete_pool = ctx
1515
.udb()?
1616
.run(|tx| async move {
1717
let tx = tx.with_subspace(namespace::keys::subspace());
@@ -20,7 +20,7 @@ pub async fn pegboard_runner_config_delete(ctx: &OperationCtx, input: &Input) ->
2020
let runner_config_key =
2121
keys::runner_config::DataKey::new(input.namespace_id, input.name.clone());
2222

23-
let delete_workflow =
23+
let delete_pool =
2424
if let Some(config) = tx.read_opt(&runner_config_key, Serializable).await? {
2525
tx.delete(&runner_config_key);
2626

@@ -32,20 +32,20 @@ pub async fn pegboard_runner_config_delete(ctx: &OperationCtx, input: &Input) ->
3232
input.name.clone(),
3333
));
3434

35-
config.affects_autoscaler()
35+
config.affects_pool()
3636
} else {
3737
false
3838
};
3939

40-
Ok(delete_workflow)
40+
Ok(delete_pool)
4141
})
4242
.custom_instrument(tracing::info_span!("runner_config_delete_tx"))
4343
.await?;
4444

45-
// Bump autoscaler when a serverless config is modified
46-
if delete_workflow {
47-
ctx.signal(crate::workflows::serverless::pool::Bump {})
48-
.to_workflow::<crate::workflows::serverless::pool::Workflow>()
45+
// Bump pool when a serverless config is modified
46+
if delete_pool {
47+
ctx.signal(crate::workflows::runner_pool::Bump {})
48+
.to_workflow::<crate::workflows::runner_pool::Workflow>()
4949
.tag("namespace_id", input.namespace_id)
5050
.tag("runner_name", input.name.clone())
5151
.send()

engine/packages/pegboard/src/ops/runner_config/upsert.rs

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ pub struct Input {
1313

1414
struct UpsertOutput {
1515
endpoint_config_changed: bool,
16-
serverless_runner_created: bool,
16+
pool_created: bool,
1717
}
1818

1919
#[operation]
@@ -52,31 +52,28 @@ pub async fn pegboard_runner_config_upsert(ctx: &OperationCtx, input: &Input) ->
5252
},
5353
) => UpsertOutput {
5454
endpoint_config_changed: old_url != new_url || old_headers != new_headers,
55-
serverless_runner_created: false,
55+
pool_created: false,
5656
},
5757
(RunnerConfigKind::Normal { .. }, RunnerConfigKind::Serverless { .. }) => {
5858
// Config type changed to serverless
5959
UpsertOutput {
6060
endpoint_config_changed: true,
61-
serverless_runner_created: true,
61+
pool_created: true,
6262
}
6363
}
6464
_ => {
6565
// Not serverless
6666
UpsertOutput {
6767
endpoint_config_changed: true,
68-
serverless_runner_created: false,
68+
pool_created: false,
6969
}
7070
}
7171
}
7272
} else {
7373
// New config
7474
UpsertOutput {
7575
endpoint_config_changed: true,
76-
serverless_runner_created: matches!(
77-
input.config.kind,
78-
RunnerConfigKind::Serverless { .. }
79-
),
76+
pool_created: matches!(input.config.kind, RunnerConfigKind::Serverless { .. }),
8077
}
8178
};
8279

@@ -161,9 +158,8 @@ pub async fn pegboard_runner_config_upsert(ctx: &OperationCtx, input: &Input) ->
161158
.await?
162159
.map_err(|err| err.build())?;
163160

164-
// Bump autoscaler
165-
if res.serverless_runner_created {
166-
ctx.workflow(crate::workflows::serverless::pool::Input {
161+
if res.pool_created {
162+
ctx.workflow(crate::workflows::runner_pool::Input {
167163
namespace_id: input.namespace_id,
168164
runner_name: input.name.clone(),
169165
})
@@ -172,10 +168,9 @@ pub async fn pegboard_runner_config_upsert(ctx: &OperationCtx, input: &Input) ->
172168
.unique()
173169
.dispatch()
174170
.await?;
175-
} else if input.config.affects_autoscaler() {
176-
// Maybe scale it
177-
ctx.signal(crate::workflows::serverless::pool::Bump {})
178-
.to_workflow::<crate::workflows::serverless::pool::Workflow>()
171+
} else if input.config.affects_pool() {
172+
ctx.signal(crate::workflows::runner_pool::Bump {})
173+
.to_workflow::<crate::workflows::runner_pool::Workflow>()
179174
.tag("namespace_id", input.namespace_id)
180175
.tag("runner_name", input.name.clone())
181176
.send()

engine/packages/pegboard/src/workflows/actor/destroy.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,16 @@ pub(crate) async fn pegboard_actor_destroy(ctx: &mut WorkflowCtx, input: &Input)
3030
})
3131
.await?;
3232

33-
// If a slot was allocated at the time of actor destruction then bump the serverless autoscaler so it can scale down
33+
// If a slot was allocated at the time of actor destruction then bump the runner pool so it can scale down
3434
// if needed
3535
if res.allocated_serverless_slot {
3636
ctx.removed::<Message<super::BumpServerlessAutoscalerStub>>()
3737
.await?;
3838

3939
let bump_res = ctx
4040
.v(2)
41-
.signal(crate::workflows::serverless::pool::Bump {})
42-
.to_workflow::<crate::workflows::serverless::pool::Workflow>()
41+
.signal(crate::workflows::runner_pool::Bump {})
42+
.to_workflow::<crate::workflows::runner_pool::Workflow>()
4343
.tag("namespace_id", input.namespace_id)
4444
.tag("runner_name", res.runner_name_selector.clone())
4545
.send()

engine/packages/pegboard/src/workflows/actor/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -842,15 +842,15 @@ async fn handle_stopped(
842842
.await?;
843843

844844
if allocate_pending_res.allocations.is_empty() {
845-
// Bump autoscaler so it can scale down if needed
845+
// Bump pool so it can scale down if needed
846846
if deallocate_res.for_serverless {
847847
ctx.removed::<Message<BumpServerlessAutoscalerStub>>()
848848
.await?;
849849

850850
let res = ctx
851851
.v(2)
852-
.signal(crate::workflows::serverless::pool::Bump {})
853-
.to_workflow::<crate::workflows::serverless::pool::Workflow>()
852+
.signal(crate::workflows::runner_pool::Bump {})
853+
.to_workflow::<crate::workflows::runner_pool::Workflow>()
854854
.tag("namespace_id", input.namespace_id)
855855
.tag("runner_name", input.runner_name_selector.clone())
856856
.send()

engine/packages/pegboard/src/workflows/actor/runtime.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -637,12 +637,12 @@ pub async fn spawn_actor(
637637
ctx.removed::<Message<super::BumpServerlessAutoscalerStub>>()
638638
.await?;
639639

640-
// Bump the autoscaler so it can scale up
640+
// Bump the pool so it can scale up
641641
if allocate_res.serverless {
642642
let res = ctx
643643
.v(2)
644-
.signal(crate::workflows::serverless::pool::Bump {})
645-
.to_workflow::<crate::workflows::serverless::pool::Workflow>()
644+
.signal(crate::workflows::runner_pool::Bump {})
645+
.to_workflow::<crate::workflows::runner_pool::Workflow>()
646646
.tag("namespace_id", input.namespace_id)
647647
.tag("runner_name", input.runner_name_selector.clone())
648648
.send()
@@ -728,12 +728,12 @@ pub async fn spawn_actor(
728728
ctx.removed::<Message<super::BumpServerlessAutoscalerStub>>()
729729
.await?;
730730

731-
// Bump the autoscaler so it can scale up
731+
// Bump the pool so it can scale up
732732
if allocate_res.serverless {
733733
let res = ctx
734734
.v(2)
735-
.signal(crate::workflows::serverless::pool::Bump {})
736-
.to_workflow::<crate::workflows::serverless::pool::Workflow>()
735+
.signal(crate::workflows::runner_pool::Bump {})
736+
.to_workflow::<crate::workflows::runner_pool::Workflow>()
737737
.tag("namespace_id", input.namespace_id)
738738
.tag("runner_name", input.runner_name_selector.clone())
739739
.send()

0 commit comments

Comments
 (0)