Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions engine/packages/guard-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ hyper-util = { workspace = true, features = ["full"] }
indoc.workspace = true
lazy_static.workspace = true
moka = { workspace = true, features = ["future"] }
pegboard.workspace = true
rand.workspace = true
regex.workspace = true
rivet-api-builder.workspace = true
rivet-config.workspace = true
rivet-error.workspace = true
rivet-metrics.workspace = true
rivet-runner-protocol.workspace = true
rivet-runtime.workspace = true
rivet-util.workspace = true
rustls.workspace = true
rustls-pemfile.workspace = true
rustls.workspace = true
serde_json.workspace = true
serde.workspace = true
tokio-rustls.workspace = true
Expand Down
8 changes: 4 additions & 4 deletions engine/packages/guard-core/src/custom_serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use http_body_util::Full;
use hyper::{Request, Response};
use pegboard::tunnel::id::RequestId;
use rivet_runner_protocol as protocol;
use tokio_tungstenite::tungstenite::protocol::frame::CloseFrame;

use crate::WebSocketHandle;
Expand All @@ -23,7 +23,7 @@ pub trait CustomServeTrait: Send + Sync {
&self,
req: Request<Full<Bytes>>,
request_context: &mut RequestContext,
request_id: RequestId,
request_id: protocol::RequestId,
) -> Result<Response<ResponseBody>>;

/// Handle a WebSocket connection after upgrade. Supports connection retries.
Expand All @@ -34,7 +34,7 @@ pub trait CustomServeTrait: Send + Sync {
_path: &str,
_request_context: &mut RequestContext,
// Identifies the websocket across retries.
_unique_request_id: RequestId,
_unique_request_id: protocol::RequestId,
// True if this websocket is reconnecting after hibernation.
_after_hibernation: bool,
) -> Result<Option<CloseFrame>> {
Expand All @@ -45,7 +45,7 @@ pub trait CustomServeTrait: Send + Sync {
async fn handle_websocket_hibernation(
&self,
_websocket: WebSocketHandle,
_unique_request_id: RequestId,
_unique_request_id: protocol::RequestId,
) -> Result<HibernationResult> {
bail!("service does not support websocket hibernation");
}
Expand Down
1 change: 0 additions & 1 deletion engine/packages/guard-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ pub mod websocket_handle;

pub use cert_resolver::CertResolverFn;
pub use custom_serve::CustomServeTrait;
pub use pegboard::tunnel::id::{RequestId, generate_request_id};
pub use proxy_service::{
CacheKeyFn, MiddlewareFn, ProxyService, ProxyState, RouteTarget, RoutingFn, RoutingOutput,
};
Expand Down
22 changes: 11 additions & 11 deletions engine/packages/guard-core/src/proxy_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use rivet_metrics::KeyValue;
use rivet_util::Id;
use serde_json;

use pegboard::tunnel::id::{RequestId, generate_request_id};
use rivet_runner_protocol as protocol;
use std::{
borrow::Cow,
collections::{HashMap as StdHashMap, HashSet},
Expand Down Expand Up @@ -350,7 +350,7 @@ pub struct ProxyState {
route_cache: RouteCache,
rate_limiters: Cache<(Id, std::net::IpAddr), Arc<Mutex<RateLimiter>>>,
in_flight_counters: Cache<(Id, std::net::IpAddr), Arc<Mutex<InFlightCounter>>>,
inflight_requests: Arc<Mutex<HashSet<RequestId>>>,
in_flight_requests: Arc<Mutex<HashSet<protocol::RequestId>>>,
port_type: PortType,
clickhouse_inserter: Option<clickhouse_inserter::ClickHouseInserterHandle>,
tasks: Arc<TaskGroup>,
Expand Down Expand Up @@ -379,7 +379,7 @@ impl ProxyState {
.max_capacity(10_000)
.time_to_live(PROXY_STATE_CACHE_TTL)
.build(),
inflight_requests: Arc::new(Mutex::new(HashSet::new())),
in_flight_requests: Arc::new(Mutex::new(HashSet::new())),
port_type,
clickhouse_inserter,
tasks: TaskGroup::new(),
Expand Down Expand Up @@ -603,7 +603,7 @@ impl ProxyState {
ip_addr: std::net::IpAddr,
actor_id: &Option<Id>,
headers: &hyper::HeaderMap,
) -> Result<Option<RequestId>> {
) -> Result<Option<protocol::RequestId>> {
// Check in-flight limit if actor_id is present
if let Some(actor_id) = *actor_id {
// Get actor-specific middleware config
Expand Down Expand Up @@ -648,7 +648,7 @@ impl ProxyState {
&self,
ip_addr: std::net::IpAddr,
actor_id: &Option<Id>,
request_id: RequestId,
request_id: protocol::RequestId,
) {
// Release in-flight counter if actor_id is present
if let Some(actor_id) = *actor_id {
Expand All @@ -660,17 +660,17 @@ impl ProxyState {
}

// Release request ID
let mut requests = self.inflight_requests.lock().await;
let mut requests = self.in_flight_requests.lock().await;
requests.remove(&request_id);
}

/// Generate a unique request ID that is not currently in flight
async fn generate_unique_request_id(&self) -> anyhow::Result<RequestId> {
async fn generate_unique_request_id(&self) -> Result<protocol::RequestId> {
const MAX_TRIES: u32 = 100;
let mut requests = self.inflight_requests.lock().await;
let mut requests = self.in_flight_requests.lock().await;

for attempt in 0..MAX_TRIES {
let request_id = generate_request_id();
let request_id = protocol::util::generate_request_id();

// Check if this ID is already in use
if !requests.contains(&request_id) {
Expand All @@ -688,7 +688,7 @@ impl ProxyState {
);
}

anyhow::bail!(
bail!(
"failed to generate unique request id after {} attempts",
MAX_TRIES
);
Expand Down Expand Up @@ -2144,7 +2144,7 @@ impl ProxyService {
.release_in_flight(client_ip, &actor_id, request_id)
.await;

anyhow::Ok(())
Ok(())
}
.instrument(tracing::info_span!("handle_ws_task_custom_serve")),
);
Expand Down
1 change: 1 addition & 0 deletions engine/packages/guard/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ rivet-guard-core.workspace = true
rivet-logs.workspace = true
rivet-metrics.workspace = true
rivet-pools.workspace = true
rivet-runner-protocol.workspace = true
rivet-runtime.workspace = true
rustls-pemfile.workspace = true
rustls.workspace = true
Expand Down
4 changes: 2 additions & 2 deletions engine/packages/guard/src/routing/api_public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use bytes::Bytes;
use gas::prelude::*;
use http_body_util::{BodyExt, Full};
use hyper::{Request, Response};
use pegboard::tunnel::id::RequestId;
use rivet_guard_core::proxy_service::{ResponseBody, RoutingOutput};
use rivet_guard_core::{CustomServeTrait, request_context::RequestContext};
use rivet_runner_protocol as protocol;
use tower::Service;

struct ApiPublicService {
Expand All @@ -21,7 +21,7 @@ impl CustomServeTrait for ApiPublicService {
&self,
req: Request<Full<Bytes>>,
_request_context: &mut RequestContext,
_request_id: RequestId,
_request_id: protocol::RequestId,
) -> Result<Response<ResponseBody>> {
// Clone the router to get a mutable service
let mut service = self.router.clone();
Expand Down
11 changes: 5 additions & 6 deletions engine/packages/pegboard-gateway/src/keepalive_task.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use anyhow::Result;
use gas::prelude::*;
use pegboard::tunnel::id as tunnel_id;
use pegboard::tunnel::id::{GatewayId, RequestId};
use rand::Rng;
use rivet_runner_protocol as protocol;
use std::time::Duration;
use tokio::sync::watch;

Expand All @@ -17,8 +16,8 @@ pub async fn task(
shared_state: SharedState,
ctx: StandaloneCtx,
actor_id: Id,
gateway_id: GatewayId,
request_id: RequestId,
gateway_id: protocol::GatewayId,
request_id: protocol::RequestId,
mut keepalive_abort_rx: watch::Receiver<()>,
) -> Result<LifecycleResult> {
let mut ping_interval = tokio::time::interval(Duration::from_millis(
Expand All @@ -44,8 +43,8 @@ pub async fn task(

tracing::debug!(
%actor_id,
gateway_id=%tunnel_id::gateway_id_to_string(&gateway_id),
request_id=%tunnel_id::request_id_to_string(&request_id),
gateway_id=%protocol::util::id_to_string(&gateway_id),
request_id=%protocol::util::id_to_string(&request_id),
"updating hws keepalive"
);

Expand Down
15 changes: 7 additions & 8 deletions engine/packages/pegboard-gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use futures_util::TryStreamExt;
use gas::prelude::*;
use http_body_util::{BodyExt, Full};
use hyper::{Request, Response, StatusCode};
use pegboard::tunnel::id::{self as tunnel_id, RequestId};
use rivet_error::*;
use rivet_guard_core::{
custom_serve::{CustomServeTrait, HibernationResult},
Expand Down Expand Up @@ -86,7 +85,7 @@ impl CustomServeTrait for PegboardGateway {
&self,
req: Request<Full<Bytes>>,
_request_context: &mut RequestContext,
request_id: RequestId,
request_id: protocol::RequestId,
) -> Result<Response<ResponseBody>> {
// Use the actor ID from the gateway instance
let actor_id = self.actor_id.to_string();
Expand Down Expand Up @@ -213,7 +212,7 @@ impl CustomServeTrait for PegboardGateway {
}
} else {
tracing::warn!(
request_id=%tunnel_id::request_id_to_string(&request_id),
request_id=%protocol::util::id_to_string(&request_id),
"received no message response during request init",
);
break;
Expand Down Expand Up @@ -268,14 +267,14 @@ impl CustomServeTrait for PegboardGateway {
Ok(response)
}

#[tracing::instrument(skip_all, fields(actor_id=?self.actor_id, runner_id=?self.runner_id, request_id=%tunnel_id::request_id_to_string(&request_id)))]
#[tracing::instrument(skip_all, fields(actor_id=?self.actor_id, runner_id=?self.runner_id, request_id=%protocol::util::id_to_string(&request_id)))]
async fn handle_websocket(
&self,
client_ws: WebSocketHandle,
headers: &hyper::HeaderMap,
_path: &str,
_request_context: &mut RequestContext,
request_id: RequestId,
request_id: protocol::RequestId,
after_hibernation: bool,
) -> Result<Option<CloseFrame>> {
// Use the actor ID from the gateway instance
Expand Down Expand Up @@ -354,7 +353,7 @@ impl CustomServeTrait for PegboardGateway {
}
} else {
tracing::warn!(
request_id=%tunnel_id::request_id_to_string(&request_id),
request_id=%protocol::util::id_to_string(&request_id),
"received no message response during ws init",
);
break;
Expand Down Expand Up @@ -572,11 +571,11 @@ impl CustomServeTrait for PegboardGateway {
}
}

#[tracing::instrument(skip_all, fields(actor_id=?self.actor_id, request_id=%tunnel_id::request_id_to_string(&request_id)))]
#[tracing::instrument(skip_all, fields(actor_id=?self.actor_id, request_id=%protocol::util::id_to_string(&request_id)))]
async fn handle_websocket_hibernation(
&self,
client_ws: WebSocketHandle,
request_id: RequestId,
request_id: protocol::RequestId,
) -> Result<HibernationResult> {
// Immediately rewake if we have pending messages
if self
Expand Down
Loading
Loading