From 9284eb324a060af2125c9048cbeef98c48d6f5e7 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Thu, 4 Dec 2025 12:38:44 -0800 Subject: [PATCH] fix: add serverless backfill --- Cargo.lock | 14 +++ Cargo.toml | 17 ++-- engine/packages/engine/Cargo.toml | 1 + engine/packages/engine/src/run_config.rs | 6 ++ .../packages/serverless-backfill/Cargo.toml | 13 +++ .../packages/serverless-backfill/src/lib.rs | 87 +++++++++++++++++++ .../06-rivet-engine-singleton-deployment.yaml | 79 ----------------- scripts/run/k8s/engine.sh | 5 -- 8 files changed, 130 insertions(+), 92 deletions(-) create mode 100644 engine/packages/serverless-backfill/Cargo.toml create mode 100644 engine/packages/serverless-backfill/src/lib.rs delete mode 100644 k8s/engine/06-rivet-engine-singleton-deployment.yaml diff --git a/Cargo.lock b/Cargo.lock index f4262236de..b9ffce2777 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4478,6 +4478,7 @@ dependencies = [ "rivet-pools", "rivet-runner-protocol", "rivet-runtime", + "rivet-serverless-backfill", "rivet-service-manager", "rivet-telemetry", "rivet-term", @@ -4739,6 +4740,19 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "rivet-serverless-backfill" +version = "0.1.0" +dependencies = [ + "anyhow", + "gasoline", + "pegboard", + "rivet-config", + "rivet-types", + "tracing", + "universaldb", +] + [[package]] name = "rivet-service-manager" version = "2.0.25" diff --git a/Cargo.toml b/Cargo.toml index 5ea0101e0d..4d31be8acb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] resolver = "2" -members = ["engine/packages/actor-kv","engine/packages/api-builder","engine/packages/api-peer","engine/packages/api-public","engine/packages/api-types","engine/packages/api-util","engine/packages/bootstrap","engine/packages/cache","engine/packages/cache-purge","engine/packages/cache-result","engine/packages/clickhouse-inserter","engine/packages/clickhouse-user-query","engine/packages/config","engine/packages/dump-openapi","engine/packages/engine","engine/packages/env","engine/packages/epoxy","engine/packages/error","engine/packages/error-macros","engine/packages/gasoline","engine/packages/gasoline-macros","engine/packages/guard","engine/packages/guard-core","engine/packages/logs","engine/packages/metrics","engine/packages/namespace","engine/packages/pegboard","engine/packages/pegboard-gateway","engine/packages/pegboard-runner","engine/packages/pools","engine/packages/runtime","engine/packages/service-manager","engine/packages/telemetry","engine/packages/test-deps","engine/packages/test-deps-docker","engine/packages/tracing-reconfigure","engine/packages/types","engine/packages/universaldb","engine/packages/universalpubsub","engine/packages/util","engine/packages/util-id","engine/packages/workflow-worker","engine/sdks/rust/api-full","engine/sdks/rust/data","engine/sdks/rust/epoxy-protocol","engine/sdks/rust/runner-protocol","engine/sdks/rust/ups-protocol"] +members = ["engine/packages/actor-kv","engine/packages/api-builder","engine/packages/api-peer","engine/packages/api-public","engine/packages/api-types","engine/packages/api-util","engine/packages/bootstrap","engine/packages/cache","engine/packages/cache-purge","engine/packages/cache-result","engine/packages/clickhouse-inserter","engine/packages/clickhouse-user-query","engine/packages/config","engine/packages/dump-openapi","engine/packages/engine","engine/packages/env","engine/packages/epoxy","engine/packages/error","engine/packages/error-macros","engine/packages/gasoline","engine/packages/gasoline-macros","engine/packages/guard","engine/packages/guard-core","engine/packages/logs","engine/packages/metrics","engine/packages/namespace","engine/packages/pegboard","engine/packages/pegboard-gateway","engine/packages/pegboard-runner","engine/packages/pools","engine/packages/runtime","engine/packages/serverless-backfill","engine/packages/service-manager","engine/packages/telemetry","engine/packages/test-deps","engine/packages/test-deps-docker","engine/packages/tracing-reconfigure","engine/packages/tracing-utils","engine/packages/types","engine/packages/universaldb","engine/packages/universalpubsub","engine/packages/util","engine/packages/util-id","engine/packages/workflow-worker","engine/sdks/rust/api-full","engine/sdks/rust/data","engine/sdks/rust/epoxy-protocol","engine/sdks/rust/runner-protocol","engine/sdks/rust/ups-protocol"] [workspace.package] version = "2.0.25" @@ -83,10 +83,13 @@ tracing = "0.1.40" tracing-core = "0.1" tracing-opentelemetry = "0.29" tracing-slog = "0.2" -vergen = { version = "9.0.4", features = ["build", "cargo", "rustc"] } vergen-gitcl = "1.0.0" reqwest-eventsource = "0.6.0" +[workspace.dependencies.vergen] +version = "9.0.4" +features = ["build","cargo","rustc"] + [workspace.dependencies.sentry] version = "0.45.0" default-features = false @@ -148,7 +151,7 @@ features = ["now"] [workspace.dependencies.clap] version = "4.3" -features = ["derive", "cargo"] +features = ["derive","cargo"] [workspace.dependencies.rivet-term] git = "https://github.com/rivet-dev/rivet-term" @@ -357,6 +360,9 @@ path = "engine/packages/pools" [workspace.dependencies.rivet-runtime] path = "engine/packages/runtime" +[workspace.dependencies.rivet-serverless-backfill] +path = "engine/packages/serverless-backfill" + [workspace.dependencies.rivet-service-manager] path = "engine/packages/service-manager" @@ -425,8 +431,3 @@ debug = false lto = "fat" codegen-units = 1 opt-level = 3 - -# strip = true -# panic = "abort" -# overflow-checks = false -# debug-assertions = false diff --git a/engine/packages/engine/Cargo.toml b/engine/packages/engine/Cargo.toml index 0463022ad3..f1b6d7ebb9 100644 --- a/engine/packages/engine/Cargo.toml +++ b/engine/packages/engine/Cargo.toml @@ -32,6 +32,7 @@ rivet-guard.workspace = true rivet-logs.workspace = true rivet-pools.workspace = true rivet-runtime.workspace = true +rivet-serverless-backfill.workspace = true rivet-service-manager.workspace = true rivet-telemetry.workspace = true rivet-term.workspace = true diff --git a/engine/packages/engine/src/run_config.rs b/engine/packages/engine/src/run_config.rs index 635575993d..a17033164a 100644 --- a/engine/packages/engine/src/run_config.rs +++ b/engine/packages/engine/src/run_config.rs @@ -40,6 +40,12 @@ pub fn config(_rivet_config: rivet_config::Config) -> Result { |config, pools| Box::pin(rivet_cache_purge::start(config, pools)), false, ), + Service::new( + "serverless_backfill", + ServiceKind::Oneshot, + |config, pools| Box::pin(rivet_serverless_backfill::start(config, pools)), + false, + ), ]; Ok(RunConfigData { services }) diff --git a/engine/packages/serverless-backfill/Cargo.toml b/engine/packages/serverless-backfill/Cargo.toml new file mode 100644 index 0000000000..a1ad64bf2b --- /dev/null +++ b/engine/packages/serverless-backfill/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "rivet-serverless-backfill" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow.workspace = true +gas.workspace = true +pegboard.workspace = true +rivet-config.workspace = true +rivet-types.workspace = true +tracing.workspace = true +universaldb.workspace = true diff --git a/engine/packages/serverless-backfill/src/lib.rs b/engine/packages/serverless-backfill/src/lib.rs new file mode 100644 index 0000000000..5213a2be43 --- /dev/null +++ b/engine/packages/serverless-backfill/src/lib.rs @@ -0,0 +1,87 @@ +use anyhow::Result; +use futures_util::{StreamExt, TryStreamExt}; +use gas::prelude::*; +use universaldb::options::StreamingMode; +use universaldb::utils::IsolationLevel::*; + +#[tracing::instrument(skip_all)] +pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> Result<()> { + let cache = rivet_cache::CacheInner::from_env(&config, pools.clone())?; + let ctx = StandaloneCtx::new( + db::DatabaseKv::from_pools(pools.clone()).await?, + config.clone(), + pools, + cache, + "serverless_backfill", + Id::new_v1(config.dc_label()), + Id::new_v1(config.dc_label()), + )?; + + let serverless_data = ctx + .udb()? + .run(|tx| async move { + let tx = tx.with_subspace(pegboard::keys::subspace()); + + let serverless_desired_subspace = pegboard::keys::subspace().subspace( + &rivet_types::keys::pegboard::ns::ServerlessDesiredSlotsKey::entire_subspace(), + ); + + tx.get_ranges_keyvalues( + universaldb::RangeOption { + mode: StreamingMode::WantAll, + ..(&serverless_desired_subspace).into() + }, + // NOTE: This is a snapshot to prevent conflict with updates to this subspace + Snapshot, + ) + .map(|res| { + tx.unpack::(res?.key()) + }) + .try_collect::>() + .await + }) + .custom_instrument(tracing::info_span!("read_serverless_tx")) + .await?; + + if serverless_data.is_empty() { + return Ok(()); + } + + tracing::info!("backfilling serverless"); + + let runner_configs = ctx + .op(pegboard::ops::runner_config::get::Input { + runners: serverless_data + .iter() + .map(|key| (key.namespace_id, key.runner_name.clone())) + .collect(), + bypass_cache: true, + }) + .await?; + + for key in &serverless_data { + if !runner_configs + .iter() + .any(|rc| rc.namespace_id == key.namespace_id) + { + tracing::debug!( + namespace_id=?key.namespace_id, + runner_name=?key.runner_name, + "runner config not found, likely deleted" + ); + continue; + }; + + ctx.workflow(pegboard::workflows::runner_pool::Input { + namespace_id: key.namespace_id, + runner_name: key.runner_name.clone(), + }) + .tag("namespace_id", key.namespace_id) + .tag("runner_name", key.runner_name.clone()) + .unique() + .dispatch() + .await?; + } + + Ok(()) +} diff --git a/k8s/engine/06-rivet-engine-singleton-deployment.yaml b/k8s/engine/06-rivet-engine-singleton-deployment.yaml deleted file mode 100644 index 6d8666b351..0000000000 --- a/k8s/engine/06-rivet-engine-singleton-deployment.yaml +++ /dev/null @@ -1,79 +0,0 @@ -apiVersion: apps/v1 -kind: Deployment -metadata: - labels: - app: rivet-engine-singleton - component: singleton - service: engine - name: rivet-engine-singleton - namespace: rivet-engine -spec: - replicas: 1 - selector: - matchLabels: - app: rivet-engine-singleton - template: - metadata: - annotations: - checksum/config: REPLACE_WITH_CONFIG_CHECKSUM - cluster-autoscaler.kubernetes.io/safe-to-evict: "false" - labels: - app: rivet-engine-singleton - component: singleton - service: engine - spec: - containers: - - args: - - start - - --services - - singleton - - --services - - api-peer - env: - - name: RIVET_CONFIG_PATH - value: /etc/rivet/config.jsonc - image: rivet-engine:local - imagePullPolicy: Never - livenessProbe: - failureThreshold: 3 - httpGet: - path: /health - port: 6421 - periodSeconds: 10 - timeoutSeconds: 5 - name: rivet-engine - ports: - - containerPort: 6421 - name: api-peer - protocol: TCP - readinessProbe: - failureThreshold: 2 - httpGet: - path: /health - port: 6421 - periodSeconds: 5 - timeoutSeconds: 3 - resources: - limits: - cpu: 4000m - memory: 8Gi - requests: - cpu: 2000m - memory: 4Gi - startupProbe: - failureThreshold: 30 - httpGet: - path: /health - port: 6421 - initialDelaySeconds: 30 - periodSeconds: 10 - timeoutSeconds: 5 - volumeMounts: - - mountPath: /etc/rivet - name: config - readOnly: true - serviceAccountName: rivet-engine - volumes: - - configMap: - name: engine-config - name: config diff --git a/scripts/run/k8s/engine.sh b/scripts/run/k8s/engine.sh index b726cfa856..b84ba64465 100755 --- a/scripts/run/k8s/engine.sh +++ b/scripts/run/k8s/engine.sh @@ -54,16 +54,11 @@ kubectl apply -f 02-engine-configmap.yaml kubectl apply -f 03-rivet-engine-deployment.yaml kubectl apply -f 04-rivet-engine-service.yaml kubectl apply -f 05-rivet-engine-hpa.yaml -kubectl apply -f 06-rivet-engine-singleton-deployment.yaml # Wait for engine to be ready echo "Waiting for engine to be ready..." kubectl -n "${NAMESPACE}" wait --for=condition=ready pod -l app=rivet-engine --timeout=300s -# Wait for singleton to be ready -echo "Waiting for singleton to be ready..." -kubectl -n "${NAMESPACE}" wait --for=condition=ready pod -l app=rivet-engine-singleton --timeout=300s - echo "" echo "Deployment complete." echo ""