From 193f67a1fb3470e5a990cec815be7ebc9dc0cc46 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Wed, 19 Nov 2025 17:50:15 -0800 Subject: [PATCH] chore(rivetkit): fix hibernation implementation --- engine/packages/pegboard-gateway/src/lib.rs | 10 +- .../pegboard-gateway/src/shared_state.rs | 7 +- engine/sdks/typescript/runner/src/actor.ts | 108 +++++++++++- engine/sdks/typescript/runner/src/mod.ts | 159 +++++++++++++---- .../sdks/typescript/runner/src/stringify.ts | 21 ++- engine/sdks/typescript/runner/src/tunnel.ts | 161 ++++++++++-------- engine/sdks/typescript/runner/src/utils.ts | 19 +++ examples/counter/scripts/connect.ts | 4 +- .../packages/rivetkit/src/actor/driver.ts | 1 + .../src/actor/instance/connection-manager.ts | 36 ++-- .../rivetkit/src/actor/instance/mod.ts | 41 ++++- .../src/actor/instance/state-manager.ts | 69 +------- .../rivetkit/src/driver-test-suite/mod.ts | 4 +- ...bernation.ts => actor-conn-hibernation.ts} | 4 +- .../src/drivers/engine/actor-driver.ts | 25 +-- scripts/run/docker/engine-postgres.sh | 3 +- scripts/run/docker/engine-rocksdb.sh | 2 +- 17 files changed, 437 insertions(+), 237 deletions(-) rename rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/{actor-hibernation.ts => actor-conn-hibernation.ts} (97%) diff --git a/engine/packages/pegboard-gateway/src/lib.rs b/engine/packages/pegboard-gateway/src/lib.rs index 937d144279..898f6af5ca 100644 --- a/engine/packages/pegboard-gateway/src/lib.rs +++ b/engine/packages/pegboard-gateway/src/lib.rs @@ -9,23 +9,23 @@ use pegboard::tunnel::id::{self as tunnel_id, RequestId}; use rand::Rng; use rivet_error::*; use rivet_guard_core::{ - WebSocketHandle, custom_serve::{CustomServeTrait, HibernationResult}, errors::{ServiceUnavailable, WebSocketServiceUnavailable}, - proxy_service::{ResponseBody, is_ws_hibernate}, + proxy_service::{is_ws_hibernate, ResponseBody}, request_context::RequestContext, websocket_handle::WebSocketReceiver, + WebSocketHandle, }; use rivet_runner_protocol as protocol; use rivet_util::serde::HashableMap; use std::{sync::Arc, time::Duration}; use tokio::{ - sync::{Mutex, watch}, + sync::{watch, Mutex}, task::JoinHandle, }; use tokio_tungstenite::tungstenite::{ + protocol::frame::{coding::CloseCode, CloseFrame}, Message, - protocol::frame::{CloseFrame, coding::CloseCode}, }; use crate::shared_state::{InFlightRequestHandle, SharedState}; @@ -486,7 +486,7 @@ impl CustomServeTrait for PegboardGateway { }; // Send close frame to runner if not hibernating - if lifecycle_res + if !&lifecycle_res .as_ref() .map_or_else(is_ws_hibernate, |_| false) { diff --git a/engine/packages/pegboard-gateway/src/shared_state.rs b/engine/packages/pegboard-gateway/src/shared_state.rs index a277916663..94baa44929 100644 --- a/engine/packages/pegboard-gateway/src/shared_state.rs +++ b/engine/packages/pegboard-gateway/src/shared_state.rs @@ -124,7 +124,6 @@ impl SharedState { entry.msg_tx = msg_tx; entry.drop_tx = drop_tx; entry.opened = false; - entry.message_index = 0; if entry.stopping { entry.hibernation_state = None; @@ -194,6 +193,11 @@ impl SharedState { }; hs.pending_ws_msgs.push(pending_ws_msg); + tracing::debug!( + index=current_message_index, + new_count=hs.pending_ws_msgs.len(), + "pushed pending websocket message" + ); } self.ups @@ -391,7 +395,6 @@ impl SharedState { let len_after = hs.pending_ws_msgs.len(); tracing::debug!( - request_id=?tunnel_id::request_id_to_string(&request_id), ack_index, removed_count = len_before - len_after, remaining_count = len_after, diff --git a/engine/sdks/typescript/runner/src/actor.ts b/engine/sdks/typescript/runner/src/actor.ts index 8db2c29aa7..272a69b63f 100644 --- a/engine/sdks/typescript/runner/src/actor.ts +++ b/engine/sdks/typescript/runner/src/actor.ts @@ -1,7 +1,9 @@ import type * as protocol from "@rivetkit/engine-runner-protocol"; import type { PendingRequest } from "./tunnel"; import type { WebSocketTunnelAdapter } from "./websocket-tunnel-adapter"; -import { arraysEqual } from "./utils"; +import { arraysEqual, promiseWithResolvers } from "./utils"; +import { logger } from "./log"; +import * as tunnelId from "./tunnel-id"; export interface ActorConfig { name: string; @@ -24,11 +26,28 @@ export class RunnerActor { requestId: protocol.RequestId; ws: WebSocketTunnelAdapter; }> = []; + actorStartPromise: ReturnType>; - constructor(actorId: string, generation: number, config: ActorConfig) { + /** + * If restoreHibernatingRequests has been called. This is used to assert + * that the caller is implemented correctly. + **/ + hibernationRestored: boolean = false; + + constructor( + actorId: string, + generation: number, + config: ActorConfig, + /** + * List of hibernating requests provided by the gateway on actor start. + * This represents the WebSocket connections that the gateway knows about. + **/ + public hibernatingRequests: readonly protocol.HibernatingRequest[], + ) { this.actorId = actorId; this.generation = generation; this.config = config; + this.actorStartPromise = promiseWithResolvers(); } // Pending request methods @@ -43,13 +62,78 @@ export class RunnerActor { )?.request; } - setPendingRequest( + createPendingRequest( + gatewayId: protocol.GatewayId, + requestId: protocol.RequestId, + clientMessageIndex: number, + ) { + const exists = + this.getPendingRequest(gatewayId, requestId) !== undefined; + if (exists) { + logger()?.warn({ + msg: "attempting to set pending request twice, replacing existing", + gatewayId: tunnelId.gatewayIdToString(gatewayId), + requestId: tunnelId.requestIdToString(requestId), + }); + // Delete existing pending request before adding the new one + this.deletePendingRequest(gatewayId, requestId); + } + this.pendingRequests.push({ + gatewayId, + requestId, + request: { + resolve: () => {}, + reject: () => {}, + actorId: this.actorId, + gatewayId: gatewayId, + requestId: requestId, + clientMessageIndex, + }, + }); + logger()?.debug({ + msg: "added pending request", + gatewayId: tunnelId.gatewayIdToString(gatewayId), + requestId: tunnelId.requestIdToString(requestId), + length: this.pendingRequests.length, + }); + } + + createPendingRequestWithStreamController( gatewayId: protocol.GatewayId, requestId: protocol.RequestId, - request: PendingRequest, + clientMessageIndex: number, + streamController: ReadableStreamDefaultController, ) { - this.deletePendingRequest(gatewayId, requestId); - this.pendingRequests.push({ gatewayId, requestId, request }); + const exists = + this.getPendingRequest(gatewayId, requestId) !== undefined; + if (exists) { + logger()?.warn({ + msg: "attempting to set pending request twice, replacing existing", + gatewayId: tunnelId.gatewayIdToString(gatewayId), + requestId: tunnelId.requestIdToString(requestId), + }); + // Delete existing pending request before adding the new one + this.deletePendingRequest(gatewayId, requestId); + } + this.pendingRequests.push({ + gatewayId, + requestId, + request: { + resolve: () => {}, + reject: () => {}, + actorId: this.actorId, + gatewayId: gatewayId, + requestId: requestId, + clientMessageIndex, + streamController, + }, + }); + logger()?.debug({ + msg: "added pending request with stream controller", + gatewayId: tunnelId.gatewayIdToString(gatewayId), + requestId: tunnelId.requestIdToString(requestId), + length: this.pendingRequests.length, + }); } deletePendingRequest( @@ -63,6 +147,12 @@ export class RunnerActor { ); if (index !== -1) { this.pendingRequests.splice(index, 1); + logger()?.debug({ + msg: "removed pending request", + gatewayId: tunnelId.gatewayIdToString(gatewayId), + requestId: tunnelId.requestIdToString(requestId), + length: this.pendingRequests.length, + }); } } @@ -83,7 +173,11 @@ export class RunnerActor { requestId: protocol.RequestId, ws: WebSocketTunnelAdapter, ) { - this.deleteWebSocket(gatewayId, requestId); + const exists = this.getWebSocket(gatewayId, requestId) !== undefined; + if (exists) { + logger()?.warn({ msg: "attempting to set websocket twice" }); + return; + } this.webSockets.push({ gatewayId, requestId, ws }); } diff --git a/engine/sdks/typescript/runner/src/mod.ts b/engine/sdks/typescript/runner/src/mod.ts index 815ab56207..7e9eb1f640 100644 --- a/engine/sdks/typescript/runner/src/mod.ts +++ b/engine/sdks/typescript/runner/src/mod.ts @@ -73,7 +73,7 @@ export interface RunnerConfig { * This is responsible for persisting hibernatable WebSockets immediately * (do not wait for open event). It is not time sensitive to flush the * connection state. If this fails to persist the HWS, the client's - * WebSocket will be disconnected on next wake in + * WebSocket will be disconnected on next wake in the call to * `Tunnel::restoreHibernatingRequests` since the connection entry will not * exist. * @@ -109,12 +109,13 @@ export interface RunnerConfig { * * ### Restoring Connections * - * `loadAll` will be called from `Tunnel::restoreHibernatingRequests` to - * restore this connection on the next actor wake. + * The user of this library is responsible for: + * 1. Loading all persisted hibernatable WebSocket metadata for an actor + * 2. Calling `Runner::restoreHibernatingRequests` with this metadata at + * the end of `onActorStart` * - * `restoreHibernatingRequests` is responsible for both making sure that - * new connections are registered with the actor and zombie connections are - * appropriately cleaned up. + * `restoreHibernatingRequests` will restore all connections and attach + * the appropriate event listeners. * * ### No Open Event On Restoration * @@ -145,25 +146,21 @@ export interface RunnerConfig { requestId: ArrayBuffer, request: Request, ) => boolean; - - /** - * Returns all hibernatable WebSockets that are stored for this actor. - * - * This is called on actor start. - * - * This list will be diffed with the list of hibernating requests in - * the ActorStart message. - * - * This that are connected but not loaded (i.e. were not successfully - * persisted to this actor) will be disconnected. - * - * This that are not connected but were loaded (i.e. disconnected but - * this actor has not received the event yet) will also be - * disconnected. - */ - loadAll(actorId: string): Promise; }; + /** + * Called when an actor starts. + * + * This callback is responsible for: + * 1. Initializing the actor instance + * 2. Loading all persisted hibernatable WebSocket metadata for this actor + * 3. Calling `Runner::restoreHibernatingRequests` with the loaded metadata + * to restore hibernatable WebSocket connections + * + * The actor should not be marked as "ready" until after + * `restoreHibernatingRequests` completes to ensure all hibernatable + * connections are fully restored before the actor processes new requests. + */ onActorStart: ( actorId: string, generation: number, @@ -291,7 +288,7 @@ export class Runner { } async forceStopActor(actorId: string, generation?: number) { - const actor = this.#removeActor(actorId, generation); + const actor = this.getActor(actorId, generation); if (!actor) return; // If onActorStop times out, Pegboard will handle this timeout with ACTOR_STOP_THRESHOLD_DURATION_MS @@ -308,6 +305,11 @@ export class Runner { // Close requests after onActorStop so you can send messages over the tunnel this.#tunnel?.closeActiveRequests(actor); + // Remove actor after stopping in order to ensure that we can still + // call actions on the runner. Do this before sending stopped update in + // order to ensure we don't have duplicate actors. + this.#removeActor(actorId, generation); + this.#sendActorStateUpdate(actorId, actor.generation, "stopped"); } @@ -325,14 +327,14 @@ export class Runner { getActor(actorId: string, generation?: number): RunnerActor | undefined { const actor = this.#actors.get(actorId); if (!actor) { - this.log?.error({ + this.log?.warn({ msg: "actor not found", actorId, }); return undefined; } if (generation !== undefined && actor.generation !== generation) { - this.log?.error({ + this.log?.warn({ msg: "actor generation mismatch", actorId, generation, @@ -343,6 +345,16 @@ export class Runner { return actor; } + async getAndWaitForActor( + actorId: string, + generation?: number, + ): Promise { + const actor = this.getActor(actorId, generation); + if (!actor) return; + await actor.actorStartPromise.promise; + return actor; + } + hasActor(actorId: string, generation?: number): boolean { const actor = this.#actors.get(actorId); @@ -380,6 +392,12 @@ export class Runner { this.#actors.delete(actorId); + this.log?.info({ + msg: "removed actor", + actorId, + actors: this.#actors.size, + }); + return actor; } @@ -729,7 +747,7 @@ export class Runner { this.#config.onConnected(); } else if (message.tag === "ToClientCommands") { const commands = message.val; - await this.#handleCommands(commands); + this.#handleCommands(commands); } else if (message.tag === "ToClientAckEvents") { this.#handleAckEvents(message.val); } else if (message.tag === "ToClientKvResponse") { @@ -834,7 +852,7 @@ export class Runner { }); } - async #handleCommands(commands: protocol.ToClientCommands) { + #handleCommands(commands: protocol.ToClientCommands) { this.log?.info({ msg: "received commands", commandCount: commands.length, @@ -845,7 +863,8 @@ export class Runner { // Spawn background promise this.#handleCommandStartActor(commandWrapper); } else if (commandWrapper.inner.tag === "CommandStopActor") { - await this.#handleCommandStopActor(commandWrapper); + // Spawn background promise + this.#handleCommandStopActor(commandWrapper); } else { unreachable(commandWrapper.inner); } @@ -894,6 +913,10 @@ export class Runner { } async #handleCommandStartActor(commandWrapper: protocol.CommandWrapper) { + // IMPORTANT: Make sure no async code runs before inserting #actors and + // calling addRequestToActor in order to prevent race conditions with + // subsequence commands + if (!this.#tunnel) throw new Error("missing tunnel on actor start"); const startCommand = commandWrapper.inner @@ -910,10 +933,44 @@ export class Runner { input: config.input ? new Uint8Array(config.input) : null, }; - const instance = new RunnerActor(actorId, generation, actorConfig); + const instance = new RunnerActor( + actorId, + generation, + actorConfig, + startCommand.hibernatingRequests, + ); + + const existingActor = this.#actors.get(actorId); + if (existingActor) { + this.log?.warn({ + msg: "replacing existing actor in actors map", + actorId, + existingGeneration: existingActor.generation, + newGeneration: generation, + existingPendingRequests: existingActor.pendingRequests.length, + }); + } this.#actors.set(actorId, instance); + // NOTE: We have to populate the requestToActor map BEFORE running any + // async code in order for incoming tunnel messages to wait for + // instance.actorStartPromise before processing messages + // TODO: Where is this GC'd if something fails? + for (const hr of startCommand.hibernatingRequests) { + this.#tunnel.addRequestToActor(hr.gatewayId, hr.requestId, actorId); + } + + this.log?.info({ + msg: "created actor", + actors: this.#actors.size, + actorId, + name: config.name, + key: config.key, + generation, + hibernatingRequests: startCommand.hibernatingRequests.length, + }); + this.#sendActorStateUpdate(actorId, generation, "running"); try { @@ -926,18 +983,16 @@ export class Runner { }); await this.#config.onActorStart(actorId, generation, actorConfig); - // Restore hibernating requests - await this.#tunnel.restoreHibernatingRequests( - actorId, - startCommand.hibernatingRequests, - ); + instance.actorStartPromise.resolve(); } catch (err) { this.log?.error({ - msg: "error in onactorstart for actor", + msg: "error starting runner actor", actorId, err, }); + instance.actorStartPromise.reject(err); + // TODO: Mark as crashed // Send stopped state update if start failed await this.forceStopActor(actorId, generation); @@ -1526,6 +1581,38 @@ export class Runner { ); } + /** + * Restores hibernatable WebSocket connections for an actor. + * + * This method should be called at the end of `onActorStart` after the + * actor instance is fully initialized. + * + * This method will: + * - Restore all provided hibernatable WebSocket connections + * - Attach event listeners to the restored WebSockets + * - Close any WebSocket connections that failed to restore + * + * The provided metadata list should include all hibernatable WebSockets + * that were persisted for this actor. The gateway will automatically + * close any connections that are not restored (i.e., not included in + * this list). + * + * **Important:** This method must be called after `onActorStart` completes + * and before marking the actor as "ready" to ensure all hibernatable + * connections are fully restored. + * + * @param actorId - The ID of the actor to restore connections for + * @param metaEntries - Array of hibernatable WebSocket metadata to restore + */ + async restoreHibernatingRequests( + actorId: string, + metaEntries: HibernatingWebSocketMetadata[], + ) { + if (!this.#tunnel) + throw new Error("missing tunnel to restore hibernating requests"); + await this.#tunnel.restoreHibernatingRequests(actorId, metaEntries); + } + getServerlessInitPacket(): string | undefined { if (!this.runnerId) return undefined; diff --git a/engine/sdks/typescript/runner/src/stringify.ts b/engine/sdks/typescript/runner/src/stringify.ts index 4f150fecc0..fc9722f4b5 100644 --- a/engine/sdks/typescript/runner/src/stringify.ts +++ b/engine/sdks/typescript/runner/src/stringify.ts @@ -111,13 +111,16 @@ export function stringifyToClientTunnelMessageKind( export function stringifyCommand(command: protocol.Command): string { switch (command.tag) { case "CommandStartActor": { - const { actorId, generation, config } = command.val; + const { actorId, generation, config, hibernatingRequests } = command.val; const keyStr = config.key === null ? "null" : `"${config.key}"`; const inputStr = config.input === null ? "null" : stringifyArrayBuffer(config.input); - return `CommandStartActor{actorId: "${actorId}", generation: ${generation}, config: {name: "${config.name}", key: ${keyStr}, createTs: ${stringifyBigInt(config.createTs)}, input: ${inputStr}}}`; + const hibernatingRequestsStr = hibernatingRequests.length > 0 + ? `[${hibernatingRequests.map((hr) => `{gatewayId: ${stringifyArrayBuffer(hr.gatewayId)}, requestId: ${stringifyArrayBuffer(hr.requestId)}}`).join(", ")}]` + : "[]"; + return `CommandStartActor{actorId: "${actorId}", generation: ${generation}, config: {name: "${config.name}", key: ${keyStr}, createTs: ${stringifyBigInt(config.createTs)}, input: ${inputStr}}, hibernatingRequests: ${hibernatingRequestsStr}}`; } case "CommandStopActor": { const { actorId, generation } = command.val; @@ -190,14 +193,18 @@ export function stringifyEventWrapper(wrapper: protocol.EventWrapper): string { export function stringifyToServer(message: protocol.ToServer): string { switch (message.tag) { case "ToServerInit": { - const { name, version, totalSlots, lastCommandIdx, metadata } = + const { name, version, totalSlots, lastCommandIdx, prepopulateActorNames, metadata } = message.val; const lastCommandIdxStr = lastCommandIdx === null ? "null" : stringifyBigInt(lastCommandIdx); + const prepopulateActorNamesStr = + prepopulateActorNames === null + ? "null" + : `Map(${prepopulateActorNames.size})`; const metadataStr = metadata === null ? "null" : `"${metadata}"`; - return `ToServerInit{name: "${name}", version: ${version}, totalSlots: ${totalSlots}, lastCommandIdx: ${lastCommandIdxStr}, metadata: ${metadataStr}}`; + return `ToServerInit{name: "${name}", version: ${version}, totalSlots: ${totalSlots}, lastCommandIdx: ${lastCommandIdxStr}, prepopulateActorNames: ${prepopulateActorNamesStr}, metadata: ${metadataStr}}`; } case "ToServerEvents": { const events = message.val; @@ -234,10 +241,8 @@ export function stringifyToClient(message: protocol.ToClient): string { switch (message.tag) { case "ToClientInit": { const { runnerId, lastEventIdx, metadata } = message.val; - const runnerLostThreshold = metadata?.runnerLostThreshold - ? stringifyBigInt(metadata.runnerLostThreshold) - : "null"; - return `ToClientInit{runnerId: "${runnerId}", lastEventIdx: ${stringifyBigInt(lastEventIdx)}, runnerLostThreshold: ${runnerLostThreshold}}`; + const metadataStr = `{runnerLostThreshold: ${stringifyBigInt(metadata.runnerLostThreshold)}}`; + return `ToClientInit{runnerId: "${runnerId}", lastEventIdx: ${stringifyBigInt(lastEventIdx)}, metadata: ${metadataStr}}`; } case "ToClientClose": return "ToClientClose"; diff --git a/engine/sdks/typescript/runner/src/tunnel.ts b/engine/sdks/typescript/runner/src/tunnel.ts index 5dbb49741a..93f8db5c31 100644 --- a/engine/sdks/typescript/runner/src/tunnel.ts +++ b/engine/sdks/typescript/runner/src/tunnel.ts @@ -104,25 +104,34 @@ export class Tunnel { async restoreHibernatingRequests( actorId: string, - hibernatingRequests: readonly protocol.HibernatingRequest[], + metaEntries: HibernatingWebSocketMetadata[], ) { + const actor = this.#runner.getActor(actorId); + if (!actor) { + throw new Error( + `Actor ${actorId} not found for restoring hibernating requests`, + ); + } + + if (actor.hibernationRestored) { + throw new Error( + `Actor ${actorId} already restored hibernating requests`, + ); + } + this.log?.debug({ msg: "restoring hibernating requests", actorId, - requests: hibernatingRequests.length, + requests: actor.hibernatingRequests.length, }); - // Load all persisted metadata - const metaEntries = - await this.#runner.config.hibernatableWebSocket.loadAll(actorId); - // Track all background operations const backgroundOperations: Promise[] = []; // Process connected WebSockets let connectedButNotLoadedCount = 0; let restoredCount = 0; - for (const { gatewayId, requestId } of hibernatingRequests) { + for (const { gatewayId, requestId } of actor.hibernatingRequests) { const requestIdStr = tunnelId.requestIdToString(requestId); const meta = metaEntries.find( (entry) => @@ -133,7 +142,7 @@ export class Tunnel { if (!meta) { // Connected but not loaded (not persisted) - close it // - // This may happen if + // This may happen if the metadata was not successfully persisted this.log?.warn({ msg: "closing websocket that is not persisted", requestId: requestIdStr, @@ -176,18 +185,11 @@ export class Tunnel { // Create a PendingRequest entry to track the message index const actor = this.#runner.getActor(actorId); if (actor) { - actor.pendingRequests.push({ + actor.createPendingRequest( gatewayId, requestId, - request: { - resolve: () => {}, - reject: () => {}, - actorId: actorId, - gatewayId: gatewayId, - requestId: requestId, - clientMessageIndex: meta.clientMessageIndex, - }, - }); + meta.clientMessageIndex, + ); } this.log?.info({ @@ -223,7 +225,7 @@ export class Tunnel { let loadedButNotConnectedCount = 0; for (const meta of metaEntries) { const requestIdStr = tunnelId.requestIdToString(meta.requestId); - const isConnected = hibernatingRequests.some( + const isConnected = actor.hibernatingRequests.some( (req) => arraysEqual(req.gatewayId, meta.gatewayId) && arraysEqual(req.requestId, meta.requestId), @@ -277,6 +279,9 @@ export class Tunnel { // Wait for all background operations to complete before finishing await Promise.allSettled(backgroundOperations); + // Mark restoration as complete + actor.hibernationRestored = true; + this.log?.info({ msg: "restored hibernatable websockets", actorId, @@ -369,7 +374,7 @@ export class Tunnel { } actor.setWebSocket(gatewayId, requestId, adapter); - this.#addRequestToActor(gatewayId, requestId, actorId); + this.addRequestToActor(gatewayId, requestId, actorId); // Call WebSocket handler. This handler will add event listeners // for `open`, etc. @@ -389,7 +394,7 @@ export class Tunnel { return adapter; } - #addRequestToActor( + addRequestToActor( gatewayId: GatewayId, requestId: RequestId, actorId: string, @@ -439,6 +444,16 @@ export class Tunnel { return actor; } + async getAndWaitForRequestActor( + gatewayId: GatewayId, + requestId: RequestId, + ): Promise { + const actor = this.getRequestActor(gatewayId, requestId); + if (!actor) return; + await actor.actorStartPromise.promise; + return actor; + } + #sendMessage( gatewayId: GatewayId, requestId: RequestId, @@ -455,11 +470,16 @@ export class Tunnel { } // Get or initialize message index for this request + // + // We don't have to wait for the actor to start since we're not calling + // any callbacks on the actor + const gatewayIdStr = tunnelId.gatewayIdToString(gatewayId); const requestIdStr = tunnelId.requestIdToString(requestId); const actor = this.getRequestActor(gatewayId, requestId); if (!actor) { this.log?.warn({ msg: "cannot send tunnel message, actor not found", + gatewayId: gatewayIdStr, requestId: requestIdStr, }); return; @@ -475,6 +495,8 @@ export class Tunnel { // No pending request this.log?.warn({ msg: "missing pending request for send message, defaulting to message index 0", + gatewayId: gatewayIdStr, + requestId: requestIdStr, }); clientMessageIndex = 0; } @@ -488,9 +510,11 @@ export class Tunnel { const messageIdStr = tunnelId.messageIdToString(messageId); this.log?.debug({ - msg: "send tunnel msg", - requestId: requestIdStr, + msg: "sending tunnel msg", messageId: messageIdStr, + gatewayId: gatewayIdStr, + requestId: requestIdStr, + messageIndex: clientMessageIndex, message: stringifyToServerTunnelMessageKind(messageKind), }); @@ -514,10 +538,7 @@ export class Tunnel { for (const entry of actor.pendingRequests) { entry.request.reject(new Error(`Actor ${actorId} stopped`)); if (entry.gatewayId && entry.requestId) { - this.#removeRequestToActor( - entry.gatewayId, - entry.requestId, - ); + this.#removeRequestToActor(entry.gatewayId, entry.requestId); } } @@ -576,12 +597,15 @@ export class Tunnel { const gatewayId = messageIdParts.gatewayId; const requestId = messageIdParts.requestId; + const gatewayIdStr = tunnelId.gatewayIdToString(gatewayId); const requestIdStr = tunnelId.requestIdToString(requestId); const messageIdStr = tunnelId.messageIdToString(message.messageId); this.log?.debug({ msg: "receive tunnel msg", - requestId: requestIdStr, messageId: messageIdStr, + gatewayId: gatewayIdStr, + requestId: requestIdStr, + messageIndex: messageIdParts.messageIndex, message: stringifyToClientTunnelMessageKind(message.messageKind), }); @@ -611,7 +635,7 @@ export class Tunnel { ); break; case "ToClientWebSocketMessage": { - this.#handleWebSocketMessage( + await this.#handleWebSocketMessage( gatewayId, requestId, messageIdParts.messageIndex, @@ -641,7 +665,7 @@ export class Tunnel { ) { // Track this request for the actor const requestIdStr = tunnelId.requestIdToString(requestId); - const actor = this.#runner.getActor(req.actorId); + const actor = await this.#runner.getAndWaitForActor(req.actorId); if (!actor) { this.log?.warn({ msg: "actor does not exist in handleRequestStart, request will leak", @@ -652,7 +676,7 @@ export class Tunnel { } // Add to request-to-actor mapping - this.#addRequestToActor(gatewayId, requestId, req.actorId); + this.addRequestToActor(gatewayId, requestId, req.actorId); try { // Convert headers map to Headers object @@ -674,23 +698,22 @@ export class Tunnel { const stream = new ReadableStream({ start: (controller) => { // Store controller for chunks - const existing = - actor.getPendingRequest(gatewayId, requestId); + const existing = actor.getPendingRequest( + gatewayId, + requestId, + ); if (existing) { existing.streamController = controller; existing.actorId = req.actorId; existing.gatewayId = gatewayId; existing.requestId = requestId; } else { - actor.setPendingRequest(gatewayId, requestId, { - resolve: () => {}, - reject: () => {}, - streamController: controller, - actorId: req.actorId, - gatewayId: gatewayId, - requestId: requestId, - clientMessageIndex: 0, - }); + actor.createPendingRequestWithStreamController( + gatewayId, + requestId, + 0, + controller, + ); } }, }); @@ -718,14 +741,7 @@ export class Tunnel { } else { // Non-streaming request // Create a pending request entry to track messageIndex for the response - actor.setPendingRequest(gatewayId, requestId, { - resolve: () => {}, - reject: () => {}, - actorId: req.actorId, - gatewayId: gatewayId, - requestId: requestId, - clientMessageIndex: 0, - }); + actor.createPendingRequest(gatewayId, requestId, 0); const response = await this.#fetch( req.actorId, @@ -769,8 +785,10 @@ export class Tunnel { requestId: RequestId, chunk: protocol.ToClientRequestChunk, ) { - const requestIdStr = tunnelId.requestIdToString(requestId); - const actor = this.getRequestActor(gatewayId, requestId); + const actor = await this.getAndWaitForRequestActor( + gatewayId, + requestId, + ); if (actor) { const pending = actor.getPendingRequest(gatewayId, requestId); if (pending?.streamController) { @@ -785,8 +803,10 @@ export class Tunnel { } async #handleRequestAbort(gatewayId: GatewayId, requestId: RequestId) { - const requestIdStr = tunnelId.requestIdToString(requestId); - const actor = this.getRequestActor(gatewayId, requestId); + const actor = await this.getAndWaitForRequestActor( + gatewayId, + requestId, + ); if (actor) { const pending = actor.getPendingRequest(gatewayId, requestId); if (pending?.streamController) { @@ -892,7 +912,7 @@ export class Tunnel { const requestIdStr = tunnelId.requestIdToString(requestId); // Validate actor exists - const actor = this.#runner.getActor(open.actorId); + const actor = await this.#runner.getAndWaitForActor(open.actorId); if (!actor) { this.log?.warn({ msg: "ignoring websocket for unknown actor", @@ -939,7 +959,7 @@ export class Tunnel { const canHibernate = this.#runner.config.hibernatableWebSocket.canHibernate( actor.actorId, - gatewayId, + gatewayId, requestId, request, ); @@ -963,14 +983,7 @@ export class Tunnel { ); // Create a PendingRequest entry to track the message index - actor.setPendingRequest(gatewayId, requestId, { - resolve: () => {}, - reject: () => {}, - actorId: actor.actorId, - gatewayId: gatewayId, - requestId: requestId, - clientMessageIndex: 0, - }); + actor.createPendingRequest(gatewayId, requestId, 0); // Open the WebSocket after `config.socket` so (a) the event // handlers can be added and (b) any errors in `config.websocket` @@ -1006,17 +1019,16 @@ export class Tunnel { } } - #handleWebSocketMessage( + async #handleWebSocketMessage( gatewayId: GatewayId, requestId: RequestId, serverMessageIndex: number, msg: protocol.ToClientWebSocketMessage, ) { - // NOTE: This method cannot be async in order to ensure in-order - // message processing. - - const requestIdStr = tunnelId.requestIdToString(requestId); - const actor = this.getRequestActor(gatewayId, requestId); + const actor = await this.getAndWaitForRequestActor( + gatewayId, + requestId, + ); if (actor) { const adapter = actor.getWebSocket(gatewayId, requestId); if (adapter) { @@ -1058,6 +1070,9 @@ export class Tunnel { throw new Error("invalid websocket ack index"); // Get the actor to find the gatewayId + // + // We don't have to wait for the actor to start since we're not calling + // any callbacks on the actor const actor = this.getRequestActor(gatewayId, requestId); if (!actor) { this.log?.warn({ @@ -1091,8 +1106,10 @@ export class Tunnel { requestId: RequestId, close: protocol.ToClientWebSocketClose, ) { - const requestIdStr = tunnelId.requestIdToString(requestId); - const actor = this.getRequestActor(gatewayId, requestId); + const actor = await this.getAndWaitForRequestActor( + gatewayId, + requestId, + ); if (actor) { const adapter = actor.getWebSocket(gatewayId, requestId); if (adapter) { diff --git a/engine/sdks/typescript/runner/src/utils.ts b/engine/sdks/typescript/runner/src/utils.ts index f9c1278f3e..6cd0cdd114 100644 --- a/engine/sdks/typescript/runner/src/utils.ts +++ b/engine/sdks/typescript/runner/src/utils.ts @@ -131,3 +131,22 @@ export function arraysEqual(a: ArrayBuffer, b: ArrayBuffer): boolean { } return true; } + +/** + * Polyfill for Promise.withResolvers(). + * + * This is specifically for Cloudflare Workers. Their implementation of Promise.withResolvers does not work correctly. + */ +export function promiseWithResolvers(): { + promise: Promise; + resolve: (value: T | PromiseLike) => void; + reject: (reason?: any) => void; +} { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: any) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} diff --git a/examples/counter/scripts/connect.ts b/examples/counter/scripts/connect.ts index 805b957ada..aa8d659547 100644 --- a/examples/counter/scripts/connect.ts +++ b/examples/counter/scripts/connect.ts @@ -8,8 +8,8 @@ async function main() { counter.on("newCount", (count: number) => console.log("Event:", count)); - for (let i = 0; i < 5; i++) { - const out = await counter.increment(5); + while (true) { + const out = await counter.increment(1); console.log("RPC:", out); await new Promise((resolve) => setTimeout(resolve, 1000)); diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/driver.ts b/rivetkit-typescript/packages/rivetkit/src/actor/driver.ts index b4d9fec093..3651c1d6d6 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/driver.ts @@ -79,6 +79,7 @@ export interface ActorDriver { /** Extra properties to add to logs for each actor. */ getExtraActorLogParams?(): Record; + onBeforeActorStart?(actor: AnyActorInstance): Promise; onCreateConn?(conn: AnyConn): void; onDestroyConn?(conn: AnyConn): void; onBeforePersistConn?(conn: AnyConn): void; diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/connection-manager.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/connection-manager.ts index 91644254a5..6cd1f69af3 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/connection-manager.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/connection-manager.ts @@ -279,10 +279,6 @@ export class ConnectionManager< // Remove from tracking this.#connections.delete(conn.id); - if (conn.isHibernatable) { - this.markConnWithPersistChanged(conn); - } - this.#actor.rLog.debug({ msg: "removed conn", connId: conn.id }); // Notify driver about connection removal @@ -321,19 +317,27 @@ export class ConnectionManager< } } + // Remove from connsWithPersistChanged after onDisconnect to handle any + // state changes made during the disconnect callback. Disconnected connections + // are removed from KV storage via kvBatchDelete below, not through the + // normal persist save flow, so they should not trigger persist saves. + this.#connsWithPersistChanged.delete(conn.id); + // Remove from KV storage - const key = makeConnKey(conn.id); - try { - await this.#actor.driver.kvBatchDelete(this.#actor.id, [key]); - this.#actor.rLog.debug({ - msg: "removed connection from KV", - connId: conn.id, - }); - } catch (err) { - this.#actor.rLog.error({ - msg: "kvBatchDelete failed for conn", - err: stringifyError(err), - }); + if (conn.isHibernatable) { + const key = makeConnKey(conn.id); + try { + await this.#actor.driver.kvBatchDelete(this.#actor.id, [key]); + this.#actor.rLog.debug({ + msg: "removed connection from KV", + connId: conn.id, + }); + } catch (err) { + this.#actor.rLog.error({ + msg: "kvBatchDelete failed for conn", + err: stringifyError(err), + }); + } } } diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts index 7ad78d8c1a..2fd4ab79f7 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts @@ -60,6 +60,7 @@ export type { SaveStateOptions }; enum CanSleep { Yes, NotReady, + NotStarted, ActiveConns, ActiveHonoHttpRequests, } @@ -108,7 +109,18 @@ export class ActorInstance { #rLog!: Logger; // MARK: - Lifecycle State + /** + * If the core actor initiation has set up. + * + * Almost all actions on this actor will throw an error if false. + **/ #ready = false; + /** + * If the actor has fully started. + * + * The only purpose of this is to prevent sleeping until started. + */ + #started = false; #sleepCalled = false; #destroyCalled = false; #stopCalled = false; @@ -362,9 +374,22 @@ export class ActorInstance { // Mark as ready this.#ready = true; - this.#rLog.info({ msg: "actor ready" }); - // Start sleep timer + // Finish up any remaining initiation + // + // Do this after #ready = true since this can call any actor callbacks + // (which require #assertReady) + await this.driver.onBeforeActorStart?.(this); + + // Mark as started + // + // We do this after onBeforeActorStart to prevent the actor from going + // to sleep before finishing setup + this.#started = true; + this.#rLog.info({ msg: "actor started" }); + + // Start sleep timer after setting #started since this affects the + // timer this.resetSleepTimer(); // Trigger any pending alarms @@ -390,7 +415,7 @@ export class ActorInstance { } this.#stopCalled = true; this.#rLog.info({ - msg: "[STOP] actor stopping - setting stopCalled=true", + msg: "setting stopCalled=true", mode, }); @@ -423,9 +448,9 @@ export class ActorInstance { ); // Clear timeouts and save state - this.#rLog.info({ msg: "[STOP] Clearing pending save timeouts" }); + this.#rLog.info({ msg: "clearing pending save timeouts" }); this.stateManager.clearPendingSaveTimeout(); - this.#rLog.info({ msg: "[STOP] Saving state immediately" }); + this.#rLog.info({ msg: "saving state immediately" }); await this.stateManager.saveState({ immediate: true, allowStoppingState: true, @@ -563,7 +588,7 @@ export class ActorInstance { actionName: string, args: unknown[], ): Promise { - invariant(this.#ready, "executing action before ready"); + this.assertReady(); if (!(actionName in this.#config.actions)) { this.#rLog.warn({ msg: "action does not exist", actionName }); @@ -1052,11 +1077,15 @@ export class ActorInstance { #canSleep(): CanSleep { if (!this.#ready) return CanSleep.NotReady; + if (!this.#started) return CanSleep.NotReady; if (this.#activeHonoHttpRequests > 0) return CanSleep.ActiveHonoHttpRequests; for (const _conn of this.connectionManager.connections.values()) { + // TODO: Add back + // if (!_conn.isHibernatable) { return CanSleep.ActiveConns; + // } } return CanSleep.Yes; diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/state-manager.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/state-manager.ts index ce6086595f..2136a66844 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/state-manager.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/state-manager.ts @@ -437,8 +437,10 @@ export class StateManager { requestId: tunnelId.requestIdToString( hibernatableDataRaw.requestId, ), - serverMessageIndex: hibernatableDataRaw.serverMessageIndex, - clientMessageIndex: hibernatableDataRaw.clientMessageIndex, + serverMessageIndex: + hibernatableDataRaw.serverMessageIndex, + clientMessageIndex: + hibernatableDataRaw.clientMessageIndex, hasState: hibernatableDataRaw.state !== undefined, }); @@ -482,69 +484,6 @@ export class StateManager { msg: "kvBatchPut completed successfully", }); - // Test: Check if KV data is immediately available after write - try { - // Try kvListAll first - if ( - "kvListAll" in this.#actorDriver && - typeof this.#actorDriver.kvListAll === "function" - ) { - const kvEntries = await ( - this.#actorDriver as any - ).kvListAll(this.#actor.id); - this.#actor.rLog.info({ - msg: "KV verification with kvListAll immediately after write", - actorId: this.#actor.id, - entriesFound: kvEntries.length, - keys: kvEntries.map( - ([k]: [Uint8Array, Uint8Array]) => - new TextDecoder().decode(k), - ), - }); - } else if ( - "kvListPrefix" in this.#actorDriver && - typeof this.#actorDriver.kvListPrefix === "function" - ) { - // Fallback to kvListPrefix if kvListAll doesn't exist - const kvEntries = await ( - this.#actorDriver as any - ).kvListPrefix(this.#actor.id, new Uint8Array()); - this.#actor.rLog.info({ - msg: "KV verification with kvListPrefix immediately after write", - actorId: this.#actor.id, - entriesFound: kvEntries.length, - keys: kvEntries.map( - ([k]: [Uint8Array, Uint8Array]) => - new TextDecoder().decode(k), - ), - }); - } - } catch (verifyError) { - this.#actor.rLog.warn({ - msg: "Failed to verify KV after write", - error: stringifyError(verifyError), - }); - } - - // List KV to verify what was written - // TODO: Re-enable when kvList is implemented on ActorDriver - // try { - // const kvList = await this.#actorDriver.kvList(this.#actor.id); - // this.#actor.rLog.info({ - // msg: "KV list after write", - // keys: kvList.map((k: Uint8Array) => { - // const keyStr = new TextDecoder().decode(k); - // return keyStr; - // }), - // keysCount: kvList.length, - // }); - // } catch (listError) { - // this.#actor.rLog.warn({ - // msg: "failed to list KV after write", - // error: stringifyError(listError), - // }); - // } - // Notify driver after persisting connections if (this.#actorDriver.onAfterPersistConn) { for (const conn of connections) { diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts index fb8b8fc306..ea18dfd626 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts @@ -22,7 +22,7 @@ import { runActorDestroyTests } from "./tests/actor-destroy"; import { runActorDriverTests } from "./tests/actor-driver"; import { runActorErrorHandlingTests } from "./tests/actor-error-handling"; import { runActorHandleTests } from "./tests/actor-handle"; -import { runActorHibernationTests } from "./tests/actor-hibernation"; +import { runActorConnHibernationTests } from "./tests/actor-conn-hibernation"; import { runActorInlineClientTests } from "./tests/actor-inline-client"; import { runActorInspectorTests } from "./tests/actor-inspector"; import { runActorMetadataTests } from "./tests/actor-metadata"; @@ -105,7 +105,7 @@ export function runDriverTests( runActorConnStateTests(driverTestConfig); - runActorHibernationTests(driverTestConfig); + runActorConnHibernationTests(driverTestConfig); runActorDestroyTests(driverTestConfig); diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-hibernation.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-conn-hibernation.ts similarity index 97% rename from rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-hibernation.ts rename to rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-conn-hibernation.ts index d01431c501..672c0e21cb 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-hibernation.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-conn-hibernation.ts @@ -3,9 +3,9 @@ import { HIBERNATION_SLEEP_TIMEOUT } from "../../../fixtures/driver-test-suite/h import type { DriverTestConfig } from "../mod"; import { setupDriverTest, waitFor } from "../utils"; -export function runActorHibernationTests(driverTestConfig: DriverTestConfig) { +export function runActorConnHibernationTests(driverTestConfig: DriverTestConfig) { describe.skipIf(driverTestConfig.skip?.hibernation)( - "Actor Hibernation Tests", + "Connection Hibernation", () => { test("basic conn hibernation", async (c) => { const { client } = await setupDriverTest(c, driverTestConfig); diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts index bed6a47f9f..b06d41e533 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts @@ -9,10 +9,7 @@ import type { Context as HonoContext } from "hono"; import { streamSSE } from "hono/streaming"; import { WSContext, type WSContextInit } from "hono/ws"; import invariant from "invariant"; -import { - type AnyConn, - CONN_STATE_MANAGER_SYMBOL, -} from "@/actor/conn/mod"; +import { type AnyConn, CONN_STATE_MANAGER_SYMBOL } from "@/actor/conn/mod"; import { lookupInRegistry } from "@/actor/definition"; import { KEYS } from "@/actor/instance/kv"; import { deserializeActorKey } from "@/actor/keys"; @@ -45,10 +42,7 @@ import { import { buildActorNames, type RegistryConfig } from "@/registry/config"; import type { RunnerConfig } from "@/registry/run-config"; import { getEndpoint } from "@/remote-manager-driver/api-utils"; -import type { RequestId } from "@/schemas/actor-persist/mod"; import { - arrayBuffersEqual, - assertUnreachable, type LongTimeoutHandle, promiseWithResolvers, setLongTimeout, @@ -170,7 +164,6 @@ export class EngineActorDriver implements ActorDriver { websocket: this.#runnerWebSocket.bind(this), hibernatableWebSocket: { canHibernate: this.#hwsCanHibernate.bind(this), - loadAll: this.#hwsLoadAll.bind(this), }, onActorStart: this.#runnerOnActorStart.bind(this), onActorStop: this.#runnerOnActorStop.bind(this), @@ -473,10 +466,6 @@ export class EngineActorDriver implements ActorDriver { "unknown", // TODO: Add regions ); - // Resolve promise if waiting - handler.actorStartPromise?.resolve(); - handler.actorStartPromise = undefined; - logger().debug({ msg: "runner actor started", actorId, name, key }); } @@ -867,6 +856,18 @@ export class EngineActorDriver implements ActorDriver { .toArray(); } + async onBeforeActorStart(actor: AnyActorInstance): Promise { + // Resolve promise if waiting + const handler = this.#actors.get(actor.id); + invariant(handler, "missing actor handler in onBeforeActorReady"); + handler.actorStartPromise?.resolve(); + handler.actorStartPromise = undefined; + + // Restore hibernating requests + const metaEntries = await this.#hwsLoadAll(actor.id); + await this.#runner.restoreHibernatingRequests(actor.id, metaEntries); + } + onCreateConn(conn: AnyConn) { const hibernatable = conn[CONN_STATE_MANAGER_SYMBOL].hibernatableData; if (!hibernatable) return; diff --git a/scripts/run/docker/engine-postgres.sh b/scripts/run/docker/engine-postgres.sh index 054cd02841..54875c4496 100755 --- a/scripts/run/docker/engine-postgres.sh +++ b/scripts/run/docker/engine-postgres.sh @@ -32,4 +32,5 @@ cd "${REPO_ROOT}" RIVET__POSTGRES__URL=postgres://postgres:postgres@localhost:5432/postgres \ RUST_LOG=debug \ -cargo run --bin rivet-engine -- start "$@" | tee /tmp/rivet-engine.log +RUST_LOG_TARGET=1 \ +cargo run --bin rivet-engine -- start "$@" 2>&1 | tee /tmp/rivet-engine.log diff --git a/scripts/run/docker/engine-rocksdb.sh b/scripts/run/docker/engine-rocksdb.sh index fa553cafbd..09358020ba 100755 --- a/scripts/run/docker/engine-rocksdb.sh +++ b/scripts/run/docker/engine-rocksdb.sh @@ -13,5 +13,5 @@ RIVET__PEGBOARD__BASE_RETRY_TIMEOUT="100" \ RIVET__PEGBOARD__RESCHEDULE_BACKOFF_MAX_EXPONENT="1" \ RIVET__PEGBOARD__RUNNER_ELIGIBLE_THRESHOLD="5000" \ RIVET__PEGBOARD__RUNNER_LOST_THRESHOLD="7000" \ -cargo run --bin rivet-engine -- start "$@" | tee -i /tmp/rivet-engine.log +cargo run --bin rivet-engine -- start "$@" 2>&1 | tee -i /tmp/rivet-engine.log