diff --git a/crates/service/src/routes/status.rs b/crates/service/src/routes/status.rs index c036426a6..46c1b5958 100644 --- a/crates/service/src/routes/status.rs +++ b/crates/service/src/routes/status.rs @@ -1,16 +1,16 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use std::{collections::HashSet, sync::LazyLock}; +use std::{collections::HashSet, sync::LazyLock, time::Instant}; -use async_graphql_axum::GraphQLRequest; -use axum::{extract::State, response::IntoResponse, Json}; -use graphql::graphql_parser::query as q; -use serde_json::{json, Map, Value}; -use thegraph_graphql_http::{ - http::request::{IntoRequestParameters, RequestParameters}, - http_client::{ReqwestExt, ResponseError}, +use axum::{ + body::Bytes, + extract::State, + http::{header::CONTENT_TYPE, StatusCode}, + response::IntoResponse, }; +use graphql::graphql_parser::query as q; +use serde::Deserialize; use crate::{error::SubgraphServiceError, service::GraphNodeState}; @@ -33,38 +33,28 @@ static SUPPORTED_ROOT_FIELDS: LazyLock> = LazyLock::new(|| .collect() }); -struct WrappedGraphQLRequest(async_graphql::Request); - -impl IntoRequestParameters for WrappedGraphQLRequest { - fn into_request_parameters(self) -> RequestParameters { - RequestParameters { - query: self.0.query.into(), - operation_name: self.0.operation_name, - variables: Map::from_iter(self.0.variables.iter().map(|(name, value)| { - ( - name.as_str().to_string(), - value.clone().into_json().unwrap(), - ) - })), - extensions: Map::from_iter( - self.0 - .extensions - .0 - .into_iter() - .map(|(name, value)| (name, value.into_json().unwrap())), - ), - } - } +/// Minimal struct to extract the query string for validation +#[derive(Deserialize)] +struct StatusRequest { + query: String, } -// Custom middleware function to process the request before reaching the main handler +/// Forwards GraphQL status queries to graph-node after validating allowed root fields. +/// +/// This handler uses direct HTTP forwarding for optimal performance, avoiding +/// the overhead of multiple serialization/deserialization layers. pub async fn status( State(state): State, - request: GraphQLRequest, + body: Bytes, ) -> Result { - let request = request.into_inner(); + let start = Instant::now(); + + // Parse request to extract query for validation + let request: StatusRequest = serde_json::from_slice(&body) + .map_err(|e| SubgraphServiceError::InvalidStatusQuery(e.into()))?; - let query: q::Document = q::parse_query(request.query.as_str()) + // Parse and validate GraphQL query fields + let query: q::Document = q::parse_query(&request.query) .map_err(|e| SubgraphServiceError::InvalidStatusQuery(e.into()))?; let root_fields = query @@ -102,19 +92,287 @@ pub async fn status( )); } - let result = state + tracing::debug!( + elapsed_ms = start.elapsed().as_millis(), + "Status query validated" + ); + + // Forward request to graph-node directly + let forward_start = Instant::now(); + let response = state .graph_node_client .post(state.graph_node_status_url.clone()) - .send_graphql::(WrappedGraphQLRequest(request)) + .header(CONTENT_TYPE, "application/json") + .body(body) + .send() .await .map_err(|e| SubgraphServiceError::StatusQueryError(e.into()))?; - result - .map(|data| Json(json!({"data": data}))) - .or_else(|e| match e { - ResponseError::Failure { errors } => Ok(Json(json!({ - "errors": errors, - }))), - ResponseError::Empty => todo!(), - }) + tracing::debug!( + elapsed_ms = forward_start.elapsed().as_millis(), + "Graph-node response received" + ); + + // Check for HTTP errors + let status = response.status(); + if !status.is_success() { + let text = response + .text() + .await + .unwrap_or_else(|_| "Failed to read error response".to_string()); + return Err(SubgraphServiceError::StatusQueryError(anyhow::anyhow!( + "Graph node returned {status}: {text}" + ))); + } + + // Return the response body as-is + let response_body = response + .text() + .await + .map_err(|e| SubgraphServiceError::StatusQueryError(e.into()))?; + + Ok(( + StatusCode::OK, + [(CONTENT_TYPE, "application/json")], + response_body, + )) +} + +#[cfg(test)] +mod tests { + use axum::{ + body::{to_bytes, Body}, + http::Request, + routing::post, + Router, + }; + use reqwest::Url; + use serde_json::json; + use tower::ServiceExt; + use wiremock::{ + matchers::{method, path}, + Mock, MockServer, ResponseTemplate, + }; + + use super::*; + use crate::service::GraphNodeState; + + async fn setup_test_router(mock_server: &MockServer) -> Router { + let graph_node_status_url: Url = mock_server + .uri() + .parse::() + .unwrap() + .join("/status") + .unwrap(); + + let state = GraphNodeState { + graph_node_client: reqwest::Client::new(), + graph_node_status_url, + graph_node_query_base_url: mock_server.uri().parse().unwrap(), + }; + + Router::new().route("/status", post(status).with_state(state)) + } + + #[tokio::test] + async fn test_valid_query_forwards_to_graph_node() { + let mock_server = MockServer::start().await; + + Mock::given(method("POST")) + .and(path("/status")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "data": { + "indexingStatuses": [ + {"subgraph": "Qm123", "health": "healthy"} + ] + } + }))) + .mount(&mock_server) + .await; + + let app = setup_test_router(&mock_server).await; + + let request = Request::builder() + .method("POST") + .uri("/status") + .header("content-type", "application/json") + .body(Body::from( + json!({"query": "{indexingStatuses {subgraph health}}"}).to_string(), + )) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + + let body = to_bytes(response.into_body(), usize::MAX).await.unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + + assert_eq!( + json, + json!({ + "data": { + "indexingStatuses": [ + {"subgraph": "Qm123", "health": "healthy"} + ] + } + }) + ); + } + + #[tokio::test] + async fn test_unsupported_root_field_returns_bad_request() { + let mock_server = MockServer::start().await; + let app = setup_test_router(&mock_server).await; + + let request = Request::builder() + .method("POST") + .uri("/status") + .header("content-type", "application/json") + .body(Body::from( + json!({"query": "{_meta {block {number}}}"}).to_string(), + )) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + + assert_eq!(response.status(), StatusCode::BAD_REQUEST); + + let body = to_bytes(response.into_body(), usize::MAX).await.unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + + assert!(json["message"] + .as_str() + .unwrap() + .contains("Unsupported status query fields")); + } + + #[tokio::test] + async fn test_malformed_json_returns_bad_request() { + let mock_server = MockServer::start().await; + let app = setup_test_router(&mock_server).await; + + let request = Request::builder() + .method("POST") + .uri("/status") + .header("content-type", "application/json") + .body(Body::from("not valid json")) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + + assert_eq!(response.status(), StatusCode::BAD_REQUEST); + } + + #[tokio::test] + async fn test_invalid_graphql_syntax_returns_bad_request() { + let mock_server = MockServer::start().await; + let app = setup_test_router(&mock_server).await; + + let request = Request::builder() + .method("POST") + .uri("/status") + .header("content-type", "application/json") + .body(Body::from( + json!({"query": "{invalid graphql syntax {"}).to_string(), + )) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + + assert_eq!(response.status(), StatusCode::BAD_REQUEST); + } + + #[tokio::test] + async fn test_graph_node_error_returns_bad_gateway() { + let mock_server = MockServer::start().await; + + Mock::given(method("POST")) + .and(path("/status")) + .respond_with(ResponseTemplate::new(500).set_body_string("Internal Server Error")) + .mount(&mock_server) + .await; + + let app = setup_test_router(&mock_server).await; + + let request = Request::builder() + .method("POST") + .uri("/status") + .header("content-type", "application/json") + .body(Body::from( + json!({"query": "{indexingStatuses {subgraph}}"}).to_string(), + )) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + + assert_eq!(response.status(), StatusCode::BAD_GATEWAY); + } + + #[tokio::test] + async fn test_multiple_supported_root_fields() { + let mock_server = MockServer::start().await; + + Mock::given(method("POST")) + .and(path("/status")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "data": { + "indexingStatuses": [], + "chains": [] + } + }))) + .mount(&mock_server) + .await; + + let app = setup_test_router(&mock_server).await; + + let request = Request::builder() + .method("POST") + .uri("/status") + .header("content-type", "application/json") + .body(Body::from( + json!({"query": "{indexingStatuses {subgraph} chains {network}}"}).to_string(), + )) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + } + + #[tokio::test] + async fn test_graphql_errors_forwarded_from_graph_node() { + let mock_server = MockServer::start().await; + + Mock::given(method("POST")) + .and(path("/status")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "errors": [{ + "message": "Type `Query` has no field `_meta`", + "locations": [{"line": 1, "column": 2}] + }] + }))) + .mount(&mock_server) + .await; + + let app = setup_test_router(&mock_server).await; + + let request = Request::builder() + .method("POST") + .uri("/status") + .header("content-type", "application/json") + .body(Body::from( + json!({"query": "{indexingStatuses {subgraph}}"}).to_string(), + )) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + + let body = to_bytes(response.into_body(), usize::MAX).await.unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + + assert!(json["errors"].is_array()); + } }