Skip to content

Commit b9c23a8

Browse files
committed
fix: add serverless backfill
1 parent a7e1a7c commit b9c23a8

File tree

8 files changed

+124
-92
lines changed

8 files changed

+124
-92
lines changed

Cargo.lock

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11

22
[workspace]
33
resolver = "2"
4-
members = ["engine/packages/actor-kv","engine/packages/api-builder","engine/packages/api-peer","engine/packages/api-public","engine/packages/api-types","engine/packages/api-util","engine/packages/bootstrap","engine/packages/cache","engine/packages/cache-purge","engine/packages/cache-result","engine/packages/clickhouse-inserter","engine/packages/clickhouse-user-query","engine/packages/config","engine/packages/dump-openapi","engine/packages/engine","engine/packages/env","engine/packages/epoxy","engine/packages/error","engine/packages/error-macros","engine/packages/gasoline","engine/packages/gasoline-macros","engine/packages/guard","engine/packages/guard-core","engine/packages/logs","engine/packages/metrics","engine/packages/namespace","engine/packages/pegboard","engine/packages/pegboard-gateway","engine/packages/pegboard-runner","engine/packages/pools","engine/packages/runtime","engine/packages/service-manager","engine/packages/telemetry","engine/packages/test-deps","engine/packages/test-deps-docker","engine/packages/tracing-reconfigure","engine/packages/types","engine/packages/universaldb","engine/packages/universalpubsub","engine/packages/util","engine/packages/util-id","engine/packages/workflow-worker","engine/sdks/rust/api-full","engine/sdks/rust/data","engine/sdks/rust/epoxy-protocol","engine/sdks/rust/runner-protocol","engine/sdks/rust/ups-protocol"]
4+
members = ["engine/packages/actor-kv","engine/packages/api-builder","engine/packages/api-peer","engine/packages/api-public","engine/packages/api-types","engine/packages/api-util","engine/packages/bootstrap","engine/packages/cache","engine/packages/cache-purge","engine/packages/cache-result","engine/packages/clickhouse-inserter","engine/packages/clickhouse-user-query","engine/packages/config","engine/packages/dump-openapi","engine/packages/engine","engine/packages/env","engine/packages/epoxy","engine/packages/error","engine/packages/error-macros","engine/packages/gasoline","engine/packages/gasoline-macros","engine/packages/guard","engine/packages/guard-core","engine/packages/logs","engine/packages/metrics","engine/packages/namespace","engine/packages/pegboard","engine/packages/pegboard-gateway","engine/packages/pegboard-runner","engine/packages/pools","engine/packages/runtime","engine/packages/serverless-backfill","engine/packages/service-manager","engine/packages/telemetry","engine/packages/test-deps","engine/packages/test-deps-docker","engine/packages/tracing-reconfigure","engine/packages/tracing-utils","engine/packages/types","engine/packages/universaldb","engine/packages/universalpubsub","engine/packages/util","engine/packages/util-id","engine/packages/workflow-worker","engine/sdks/rust/api-full","engine/sdks/rust/data","engine/sdks/rust/epoxy-protocol","engine/sdks/rust/runner-protocol","engine/sdks/rust/ups-protocol"]
55

66
[workspace.package]
77
version = "2.0.25"
@@ -83,10 +83,13 @@ tracing = "0.1.40"
8383
tracing-core = "0.1"
8484
tracing-opentelemetry = "0.29"
8585
tracing-slog = "0.2"
86-
vergen = { version = "9.0.4", features = ["build", "cargo", "rustc"] }
8786
vergen-gitcl = "1.0.0"
8887
reqwest-eventsource = "0.6.0"
8988

89+
[workspace.dependencies.vergen]
90+
version = "9.0.4"
91+
features = ["build","cargo","rustc"]
92+
9093
[workspace.dependencies.sentry]
9194
version = "0.45.0"
9295
default-features = false
@@ -148,7 +151,7 @@ features = ["now"]
148151

149152
[workspace.dependencies.clap]
150153
version = "4.3"
151-
features = ["derive", "cargo"]
154+
features = ["derive","cargo"]
152155

153156
[workspace.dependencies.rivet-term]
154157
git = "https://github.com/rivet-dev/rivet-term"
@@ -357,6 +360,9 @@ path = "engine/packages/pools"
357360
[workspace.dependencies.rivet-runtime]
358361
path = "engine/packages/runtime"
359362

363+
[workspace.dependencies.rivet-serverless-backfill]
364+
path = "engine/packages/serverless-backfill"
365+
360366
[workspace.dependencies.rivet-service-manager]
361367
path = "engine/packages/service-manager"
362368

@@ -425,8 +431,3 @@ debug = false
425431
lto = "fat"
426432
codegen-units = 1
427433
opt-level = 3
428-
429-
# strip = true
430-
# panic = "abort"
431-
# overflow-checks = false
432-
# debug-assertions = false

engine/packages/engine/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ rivet-guard.workspace = true
3232
rivet-logs.workspace = true
3333
rivet-pools.workspace = true
3434
rivet-runtime.workspace = true
35+
rivet-serverless-backfill.workspace = true
3536
rivet-service-manager.workspace = true
3637
rivet-telemetry.workspace = true
3738
rivet-term.workspace = true

engine/packages/engine/src/run_config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@ pub fn config(_rivet_config: rivet_config::Config) -> Result<RunConfigData> {
4040
|config, pools| Box::pin(rivet_cache_purge::start(config, pools)),
4141
false,
4242
),
43+
Service::new(
44+
"serverless_backfill",
45+
ServiceKind::Oneshot,
46+
|config, pools| Box::pin(rivet_serverless_backfill::start(config, pools)),
47+
false,
48+
),
4349
];
4450

4551
Ok(RunConfigData { services })
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[package]
2+
name = "rivet-serverless-backfill"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
anyhow.workspace = true
8+
gas.workspace = true
9+
pegboard.workspace = true
10+
rivet-config.workspace = true
11+
rivet-types.workspace = true
12+
tracing.workspace = true
13+
universaldb.workspace = true
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
use anyhow::Result;
2+
use futures_util::{StreamExt, TryStreamExt};
3+
use gas::prelude::*;
4+
use universaldb::options::StreamingMode;
5+
use universaldb::utils::IsolationLevel::*;
6+
7+
#[tracing::instrument(skip_all)]
8+
pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> Result<()> {
9+
let cache = rivet_cache::CacheInner::from_env(&config, pools.clone())?;
10+
let ctx = StandaloneCtx::new(
11+
db::DatabaseKv::from_pools(pools.clone()).await?,
12+
config.clone(),
13+
pools,
14+
cache,
15+
"serverless_backfill",
16+
Id::new_v1(config.dc_label()),
17+
Id::new_v1(config.dc_label()),
18+
)?;
19+
20+
let serverless_data = ctx
21+
.udb()?
22+
.run(|tx| async move {
23+
let tx = tx.with_subspace(pegboard::keys::subspace());
24+
25+
let serverless_desired_subspace = pegboard::keys::subspace().subspace(
26+
&rivet_types::keys::pegboard::ns::ServerlessDesiredSlotsKey::entire_subspace(),
27+
);
28+
29+
tx.get_ranges_keyvalues(
30+
universaldb::RangeOption {
31+
mode: StreamingMode::WantAll,
32+
..(&serverless_desired_subspace).into()
33+
},
34+
// NOTE: This is a snapshot to prevent conflict with updates to this subspace
35+
Snapshot,
36+
)
37+
.map(|res| {
38+
tx.unpack::<rivet_types::keys::pegboard::ns::ServerlessDesiredSlotsKey>(res?.key())
39+
})
40+
.try_collect::<Vec<_>>()
41+
.await
42+
})
43+
.custom_instrument(tracing::info_span!("read_serverless_tx"))
44+
.await?;
45+
46+
let runner_configs = ctx
47+
.op(pegboard::ops::runner_config::get::Input {
48+
runners: serverless_data
49+
.iter()
50+
.map(|key| (key.namespace_id, key.runner_name.clone()))
51+
.collect(),
52+
bypass_cache: true,
53+
})
54+
.await?;
55+
56+
for key in &serverless_data {
57+
if !runner_configs
58+
.iter()
59+
.any(|rc| rc.namespace_id == key.namespace_id)
60+
{
61+
tracing::debug!(
62+
namespace_id=?key.namespace_id,
63+
runner_name=?key.runner_name,
64+
"runner config not found, likely deleted"
65+
);
66+
continue;
67+
};
68+
69+
ctx.workflow(pegboard::workflows::runner_pool::Input {
70+
namespace_id: key.namespace_id,
71+
runner_name: key.runner_name.clone(),
72+
})
73+
.tag("namespace_id", key.namespace_id)
74+
.tag("runner_name", key.runner_name.clone())
75+
.unique()
76+
.dispatch()
77+
.await?;
78+
}
79+
80+
Ok(())
81+
}

k8s/engine/06-rivet-engine-singleton-deployment.yaml

Lines changed: 0 additions & 79 deletions
This file was deleted.

scripts/run/k8s/engine.sh

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,11 @@ kubectl apply -f 02-engine-configmap.yaml
5454
kubectl apply -f 03-rivet-engine-deployment.yaml
5555
kubectl apply -f 04-rivet-engine-service.yaml
5656
kubectl apply -f 05-rivet-engine-hpa.yaml
57-
kubectl apply -f 06-rivet-engine-singleton-deployment.yaml
5857

5958
# Wait for engine to be ready
6059
echo "Waiting for engine to be ready..."
6160
kubectl -n "${NAMESPACE}" wait --for=condition=ready pod -l app=rivet-engine --timeout=300s
6261

63-
# Wait for singleton to be ready
64-
echo "Waiting for singleton to be ready..."
65-
kubectl -n "${NAMESPACE}" wait --for=condition=ready pod -l app=rivet-engine-singleton --timeout=300s
66-
6762
echo ""
6863
echo "Deployment complete."
6964
echo ""

0 commit comments

Comments
 (0)