11// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
22// SPDX-License-Identifier: Apache-2.0
33
4- use std:: { collections:: HashSet , sync:: LazyLock } ;
4+ use std:: { collections:: HashSet , sync:: LazyLock , time :: Instant } ;
55
6- use async_graphql_axum:: GraphQLRequest ;
7- use axum:: { extract:: State , response:: IntoResponse , Json } ;
8- use graphql:: graphql_parser:: query as q;
9- use serde_json:: { json, Map , Value } ;
10- use thegraph_graphql_http:: {
11- http:: request:: { IntoRequestParameters , RequestParameters } ,
12- http_client:: { ReqwestExt , ResponseError } ,
6+ use axum:: {
7+ body:: Bytes ,
8+ extract:: State ,
9+ http:: { header:: CONTENT_TYPE , StatusCode } ,
10+ response:: IntoResponse ,
1311} ;
12+ use graphql:: graphql_parser:: query as q;
13+ use serde:: Deserialize ;
1414
1515use crate :: { error:: SubgraphServiceError , service:: GraphNodeState } ;
1616
@@ -33,38 +33,28 @@ static SUPPORTED_ROOT_FIELDS: LazyLock<HashSet<&'static str>> = LazyLock::new(||
3333 . collect ( )
3434} ) ;
3535
36- struct WrappedGraphQLRequest ( async_graphql:: Request ) ;
37-
38- impl IntoRequestParameters for WrappedGraphQLRequest {
39- fn into_request_parameters ( self ) -> RequestParameters {
40- RequestParameters {
41- query : self . 0 . query . into ( ) ,
42- operation_name : self . 0 . operation_name ,
43- variables : Map :: from_iter ( self . 0 . variables . iter ( ) . map ( |( name, value) | {
44- (
45- name. as_str ( ) . to_string ( ) ,
46- value. clone ( ) . into_json ( ) . unwrap ( ) ,
47- )
48- } ) ) ,
49- extensions : Map :: from_iter (
50- self . 0
51- . extensions
52- . 0
53- . into_iter ( )
54- . map ( |( name, value) | ( name, value. into_json ( ) . unwrap ( ) ) ) ,
55- ) ,
56- }
57- }
36+ /// Minimal struct to extract the query string for validation
37+ #[ derive( Deserialize ) ]
38+ struct StatusRequest {
39+ query : String ,
5840}
5941
60- // Custom middleware function to process the request before reaching the main handler
42+ /// Forwards GraphQL status queries to graph-node after validating allowed root fields.
43+ ///
44+ /// This handler uses direct HTTP forwarding for optimal performance, avoiding
45+ /// the overhead of multiple serialization/deserialization layers.
6146pub async fn status (
6247 State ( state) : State < GraphNodeState > ,
63- request : GraphQLRequest ,
48+ body : Bytes ,
6449) -> Result < impl IntoResponse , SubgraphServiceError > {
65- let request = request. into_inner ( ) ;
50+ let start = Instant :: now ( ) ;
51+
52+ // Parse request to extract query for validation
53+ let request: StatusRequest = serde_json:: from_slice ( & body)
54+ . map_err ( |e| SubgraphServiceError :: InvalidStatusQuery ( e. into ( ) ) ) ?;
6655
67- let query: q:: Document < String > = q:: parse_query ( request. query . as_str ( ) )
56+ // Parse and validate GraphQL query fields
57+ let query: q:: Document < String > = q:: parse_query ( & request. query )
6858 . map_err ( |e| SubgraphServiceError :: InvalidStatusQuery ( e. into ( ) ) ) ?;
6959
7060 let root_fields = query
@@ -102,19 +92,287 @@ pub async fn status(
10292 ) ) ;
10393 }
10494
105- let result = state
95+ tracing:: debug!(
96+ elapsed_ms = start. elapsed( ) . as_millis( ) ,
97+ "Status query validated"
98+ ) ;
99+
100+ // Forward request to graph-node directly
101+ let forward_start = Instant :: now ( ) ;
102+ let response = state
106103 . graph_node_client
107104 . post ( state. graph_node_status_url . clone ( ) )
108- . send_graphql :: < Value > ( WrappedGraphQLRequest ( request) )
105+ . header ( CONTENT_TYPE , "application/json" )
106+ . body ( body)
107+ . send ( )
109108 . await
110109 . map_err ( |e| SubgraphServiceError :: StatusQueryError ( e. into ( ) ) ) ?;
111110
112- result
113- . map ( |data| Json ( json ! ( { "data" : data} ) ) )
114- . or_else ( |e| match e {
115- ResponseError :: Failure { errors } => Ok ( Json ( json ! ( {
116- "errors" : errors,
117- } ) ) ) ,
118- ResponseError :: Empty => todo ! ( ) ,
119- } )
111+ tracing:: debug!(
112+ elapsed_ms = forward_start. elapsed( ) . as_millis( ) ,
113+ "Graph-node response received"
114+ ) ;
115+
116+ // Check for HTTP errors
117+ let status = response. status ( ) ;
118+ if !status. is_success ( ) {
119+ let text = response
120+ . text ( )
121+ . await
122+ . unwrap_or_else ( |_| "Failed to read error response" . to_string ( ) ) ;
123+ return Err ( SubgraphServiceError :: StatusQueryError ( anyhow:: anyhow!(
124+ "Graph node returned {status}: {text}"
125+ ) ) ) ;
126+ }
127+
128+ // Return the response body as-is
129+ let response_body = response
130+ . text ( )
131+ . await
132+ . map_err ( |e| SubgraphServiceError :: StatusQueryError ( e. into ( ) ) ) ?;
133+
134+ Ok ( (
135+ StatusCode :: OK ,
136+ [ ( CONTENT_TYPE , "application/json" ) ] ,
137+ response_body,
138+ ) )
139+ }
140+
141+ #[ cfg( test) ]
142+ mod tests {
143+ use axum:: {
144+ body:: { to_bytes, Body } ,
145+ http:: Request ,
146+ routing:: post,
147+ Router ,
148+ } ;
149+ use reqwest:: Url ;
150+ use serde_json:: json;
151+ use tower:: ServiceExt ;
152+ use wiremock:: {
153+ matchers:: { method, path} ,
154+ Mock , MockServer , ResponseTemplate ,
155+ } ;
156+
157+ use super :: * ;
158+ use crate :: service:: GraphNodeState ;
159+
160+ async fn setup_test_router ( mock_server : & MockServer ) -> Router {
161+ let graph_node_status_url: Url = mock_server
162+ . uri ( )
163+ . parse :: < Url > ( )
164+ . unwrap ( )
165+ . join ( "/status" )
166+ . unwrap ( ) ;
167+
168+ let state = GraphNodeState {
169+ graph_node_client : reqwest:: Client :: new ( ) ,
170+ graph_node_status_url,
171+ graph_node_query_base_url : mock_server. uri ( ) . parse ( ) . unwrap ( ) ,
172+ } ;
173+
174+ Router :: new ( ) . route ( "/status" , post ( status) . with_state ( state) )
175+ }
176+
177+ #[ tokio:: test]
178+ async fn test_valid_query_forwards_to_graph_node ( ) {
179+ let mock_server = MockServer :: start ( ) . await ;
180+
181+ Mock :: given ( method ( "POST" ) )
182+ . and ( path ( "/status" ) )
183+ . respond_with ( ResponseTemplate :: new ( 200 ) . set_body_json ( json ! ( {
184+ "data" : {
185+ "indexingStatuses" : [
186+ { "subgraph" : "Qm123" , "health" : "healthy" }
187+ ]
188+ }
189+ } ) ) )
190+ . mount ( & mock_server)
191+ . await ;
192+
193+ let app = setup_test_router ( & mock_server) . await ;
194+
195+ let request = Request :: builder ( )
196+ . method ( "POST" )
197+ . uri ( "/status" )
198+ . header ( "content-type" , "application/json" )
199+ . body ( Body :: from (
200+ json ! ( { "query" : "{indexingStatuses {subgraph health}}" } ) . to_string ( ) ,
201+ ) )
202+ . unwrap ( ) ;
203+
204+ let response = app. oneshot ( request) . await . unwrap ( ) ;
205+
206+ assert_eq ! ( response. status( ) , StatusCode :: OK ) ;
207+
208+ let body = to_bytes ( response. into_body ( ) , usize:: MAX ) . await . unwrap ( ) ;
209+ let json: serde_json:: Value = serde_json:: from_slice ( & body) . unwrap ( ) ;
210+
211+ assert_eq ! (
212+ json,
213+ json!( {
214+ "data" : {
215+ "indexingStatuses" : [
216+ { "subgraph" : "Qm123" , "health" : "healthy" }
217+ ]
218+ }
219+ } )
220+ ) ;
221+ }
222+
223+ #[ tokio:: test]
224+ async fn test_unsupported_root_field_returns_bad_request ( ) {
225+ let mock_server = MockServer :: start ( ) . await ;
226+ let app = setup_test_router ( & mock_server) . await ;
227+
228+ let request = Request :: builder ( )
229+ . method ( "POST" )
230+ . uri ( "/status" )
231+ . header ( "content-type" , "application/json" )
232+ . body ( Body :: from (
233+ json ! ( { "query" : "{_meta {block {number}}}" } ) . to_string ( ) ,
234+ ) )
235+ . unwrap ( ) ;
236+
237+ let response = app. oneshot ( request) . await . unwrap ( ) ;
238+
239+ assert_eq ! ( response. status( ) , StatusCode :: BAD_REQUEST ) ;
240+
241+ let body = to_bytes ( response. into_body ( ) , usize:: MAX ) . await . unwrap ( ) ;
242+ let json: serde_json:: Value = serde_json:: from_slice ( & body) . unwrap ( ) ;
243+
244+ assert ! ( json[ "message" ]
245+ . as_str( )
246+ . unwrap( )
247+ . contains( "Unsupported status query fields" ) ) ;
248+ }
249+
250+ #[ tokio:: test]
251+ async fn test_malformed_json_returns_bad_request ( ) {
252+ let mock_server = MockServer :: start ( ) . await ;
253+ let app = setup_test_router ( & mock_server) . await ;
254+
255+ let request = Request :: builder ( )
256+ . method ( "POST" )
257+ . uri ( "/status" )
258+ . header ( "content-type" , "application/json" )
259+ . body ( Body :: from ( "not valid json" ) )
260+ . unwrap ( ) ;
261+
262+ let response = app. oneshot ( request) . await . unwrap ( ) ;
263+
264+ assert_eq ! ( response. status( ) , StatusCode :: BAD_REQUEST ) ;
265+ }
266+
267+ #[ tokio:: test]
268+ async fn test_invalid_graphql_syntax_returns_bad_request ( ) {
269+ let mock_server = MockServer :: start ( ) . await ;
270+ let app = setup_test_router ( & mock_server) . await ;
271+
272+ let request = Request :: builder ( )
273+ . method ( "POST" )
274+ . uri ( "/status" )
275+ . header ( "content-type" , "application/json" )
276+ . body ( Body :: from (
277+ json ! ( { "query" : "{invalid graphql syntax {" } ) . to_string ( ) ,
278+ ) )
279+ . unwrap ( ) ;
280+
281+ let response = app. oneshot ( request) . await . unwrap ( ) ;
282+
283+ assert_eq ! ( response. status( ) , StatusCode :: BAD_REQUEST ) ;
284+ }
285+
286+ #[ tokio:: test]
287+ async fn test_graph_node_error_returns_bad_gateway ( ) {
288+ let mock_server = MockServer :: start ( ) . await ;
289+
290+ Mock :: given ( method ( "POST" ) )
291+ . and ( path ( "/status" ) )
292+ . respond_with ( ResponseTemplate :: new ( 500 ) . set_body_string ( "Internal Server Error" ) )
293+ . mount ( & mock_server)
294+ . await ;
295+
296+ let app = setup_test_router ( & mock_server) . await ;
297+
298+ let request = Request :: builder ( )
299+ . method ( "POST" )
300+ . uri ( "/status" )
301+ . header ( "content-type" , "application/json" )
302+ . body ( Body :: from (
303+ json ! ( { "query" : "{indexingStatuses {subgraph}}" } ) . to_string ( ) ,
304+ ) )
305+ . unwrap ( ) ;
306+
307+ let response = app. oneshot ( request) . await . unwrap ( ) ;
308+
309+ assert_eq ! ( response. status( ) , StatusCode :: BAD_GATEWAY ) ;
310+ }
311+
312+ #[ tokio:: test]
313+ async fn test_multiple_supported_root_fields ( ) {
314+ let mock_server = MockServer :: start ( ) . await ;
315+
316+ Mock :: given ( method ( "POST" ) )
317+ . and ( path ( "/status" ) )
318+ . respond_with ( ResponseTemplate :: new ( 200 ) . set_body_json ( json ! ( {
319+ "data" : {
320+ "indexingStatuses" : [ ] ,
321+ "chains" : [ ]
322+ }
323+ } ) ) )
324+ . mount ( & mock_server)
325+ . await ;
326+
327+ let app = setup_test_router ( & mock_server) . await ;
328+
329+ let request = Request :: builder ( )
330+ . method ( "POST" )
331+ . uri ( "/status" )
332+ . header ( "content-type" , "application/json" )
333+ . body ( Body :: from (
334+ json ! ( { "query" : "{indexingStatuses {subgraph} chains {network}}" } ) . to_string ( ) ,
335+ ) )
336+ . unwrap ( ) ;
337+
338+ let response = app. oneshot ( request) . await . unwrap ( ) ;
339+
340+ assert_eq ! ( response. status( ) , StatusCode :: OK ) ;
341+ }
342+
343+ #[ tokio:: test]
344+ async fn test_graphql_errors_forwarded_from_graph_node ( ) {
345+ let mock_server = MockServer :: start ( ) . await ;
346+
347+ Mock :: given ( method ( "POST" ) )
348+ . and ( path ( "/status" ) )
349+ . respond_with ( ResponseTemplate :: new ( 200 ) . set_body_json ( json ! ( {
350+ "errors" : [ {
351+ "message" : "Type `Query` has no field `_meta`" ,
352+ "locations" : [ { "line" : 1 , "column" : 2 } ]
353+ } ]
354+ } ) ) )
355+ . mount ( & mock_server)
356+ . await ;
357+
358+ let app = setup_test_router ( & mock_server) . await ;
359+
360+ let request = Request :: builder ( )
361+ . method ( "POST" )
362+ . uri ( "/status" )
363+ . header ( "content-type" , "application/json" )
364+ . body ( Body :: from (
365+ json ! ( { "query" : "{indexingStatuses {subgraph}}" } ) . to_string ( ) ,
366+ ) )
367+ . unwrap ( ) ;
368+
369+ let response = app. oneshot ( request) . await . unwrap ( ) ;
370+
371+ assert_eq ! ( response. status( ) , StatusCode :: OK ) ;
372+
373+ let body = to_bytes ( response. into_body ( ) , usize:: MAX ) . await . unwrap ( ) ;
374+ let json: serde_json:: Value = serde_json:: from_slice ( & body) . unwrap ( ) ;
375+
376+ assert ! ( json[ "errors" ] . is_array( ) ) ;
377+ }
120378}
0 commit comments