From 0b4d9f51e3dd938df372fd64eab330bfc1f63a84 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Fri, 28 Nov 2025 18:08:47 +0200 Subject: [PATCH 1/2] add lastRefreshTime to load result in openapi spec --- packages/cubejs-api-gateway/openspec.yml | 2 ++ rust/cubesql/cubeclient/src/models/v1_load_result.rs | 3 +++ 2 files changed, 5 insertions(+) diff --git a/packages/cubejs-api-gateway/openspec.yml b/packages/cubejs-api-gateway/openspec.yml index f83b0642b32af..07515fc0cded4 100644 --- a/packages/cubejs-api-gateway/openspec.yml +++ b/packages/cubejs-api-gateway/openspec.yml @@ -341,6 +341,8 @@ components: type: "array" items: type: "object" + lastRefreshTime: + type: "string" V1Error: type: "object" required: diff --git a/rust/cubesql/cubeclient/src/models/v1_load_result.rs b/rust/cubesql/cubeclient/src/models/v1_load_result.rs index b0610626d6d41..54aa48ffcad12 100644 --- a/rust/cubesql/cubeclient/src/models/v1_load_result.rs +++ b/rust/cubesql/cubeclient/src/models/v1_load_result.rs @@ -21,6 +21,8 @@ pub struct V1LoadResult { pub data: Vec, #[serde(rename = "refreshKeyValues", skip_serializing_if = "Option::is_none")] pub refresh_key_values: Option>, + #[serde(rename = "lastRefreshTime", skip_serializing_if = "Option::is_none")] + pub last_refresh_time: Option, } impl V1LoadResult { @@ -33,6 +35,7 @@ impl V1LoadResult { annotation: Box::new(annotation), data, refresh_key_values: None, + last_refresh_time: None, } } } From 9666978cb389d933fd9098efff109766b7c7064e Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Mon, 1 Dec 2025 13:29:52 +0200 Subject: [PATCH 2/2] feat(backend-native): expose lastRefreshTime via /cubesql API --- .../cubejs-backend-native/src/node_export.rs | 7 ++ .../cubejs-backend-native/src/orchestrator.rs | 2 + .../cubejs-backend-native/src/transport.rs | 69 +++++++++++++++---- .../cubesql/src/compile/engine/df/scan.rs | 18 +++-- rust/cubesql/cubesql/src/compile/test/mod.rs | 7 +- rust/cubesql/cubesql/src/transport/service.rs | 14 ++-- 6 files changed, 91 insertions(+), 26 deletions(-) diff --git a/packages/cubejs-backend-native/src/node_export.rs b/packages/cubejs-backend-native/src/node_export.rs index 094eba5c2493d..2e3b63ce2d9ac 100644 --- a/packages/cubejs-backend-native/src/node_export.rs +++ b/packages/cubejs-backend-native/src/node_export.rs @@ -327,6 +327,13 @@ async fn handle_sql_query( let mut schema_response = Map::new(); schema_response.insert("schema".into(), columns_json); + if let Some(last_refresh_time) = stream_schema.metadata().get("lastRefreshTime") { + schema_response.insert( + "lastRefreshTime".into(), + serde_json::Value::String(last_refresh_time.clone()), + ); + } + write_jsonl_message( channel.clone(), stream_methods.write.clone(), diff --git a/packages/cubejs-backend-native/src/orchestrator.rs b/packages/cubejs-backend-native/src/orchestrator.rs index 5ab606766b5ea..28a128df6eb94 100644 --- a/packages/cubejs-backend-native/src/orchestrator.rs +++ b/packages/cubejs-backend-native/src/orchestrator.rs @@ -37,6 +37,7 @@ pub struct ResultWrapper { transform_data: TransformDataRequest, data: Arc, transformed_data: Option, + pub last_refresh_time: Option, } impl ResultWrapper { @@ -114,6 +115,7 @@ impl ResultWrapper { transform_data: transform_request, data: query_result, transformed_data: None, + last_refresh_time: None, }) } diff --git a/packages/cubejs-backend-native/src/transport.rs b/packages/cubejs-backend-native/src/transport.rs index eaf048d126763..9040aaa0901aa 100644 --- a/packages/cubejs-backend-native/src/transport.rs +++ b/packages/cubejs-backend-native/src/transport.rs @@ -5,6 +5,7 @@ use std::fmt::Display; use crate::auth::NativeSQLAuthContext; use crate::channel::{call_raw_js_with_channel_as_callback, NodeSqlGenerator, ValueFromJs}; +use crate::node_obj_deserializer::JsValueDeserializer; use crate::node_obj_serializer::NodeObjSerializer; use crate::orchestrator::ResultWrapper; use crate::{ @@ -12,13 +13,15 @@ use crate::{ stream::call_js_with_stream_as_callback, }; use async_trait::async_trait; +use cubeorchestrator::query_result_transform::RequestResultData; +use cubesql::compile::arrow::datatypes::Schema; use cubesql::compile::engine::df::scan::{ - convert_transport_response, transform_response, CacheMode, MemberField, RecordBatch, SchemaRef, + convert_transport_response, transform_response, CacheMode, MemberField, SchemaRef, }; use cubesql::compile::engine::df::wrapper::SqlQuery; use cubesql::transport::{ SpanId, SqlGenerator, SqlResponse, TransportLoadRequestQuery, TransportLoadResponse, - TransportMetaResponse, + TransportMetaResponse, TransportServiceLoadResponse, }; use cubesql::{ di_service, @@ -26,7 +29,7 @@ use cubesql::{ transport::{CubeStreamReceiver, LoadRequestMeta, MetaContext, TransportService}, CubeError, }; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use std::sync::Arc; use uuid::Uuid; @@ -342,7 +345,7 @@ impl TransportService for NodeBridgeTransport { schema: SchemaRef, member_fields: Vec, cache_mode: Option, - ) -> Result, CubeError> { + ) -> Result, CubeError> { trace!("[transport] Request ->"); let native_auth = ctx @@ -402,14 +405,37 @@ impl TransportService for NodeBridgeTransport { .to_vec(cx) .map_cube_err("Can't convert JS result to array")?; - let native_wrapped_results = js_res_wrapped_vec - .iter() - .map(|r| ResultWrapper::from_js_result_wrapper(cx, *r)) - .collect::, _>>() + let get_root_result_object_method: Handle = + js_result_wrapped.get(cx, "getRootResultObject").map_cube_err( + "Can't get getRootResultObject method from JS ResultWrapper object", + )?; + + let result_data_js_array = get_root_result_object_method + .call(cx, js_result_wrapped.upcast::(), []) .map_cube_err( - "Can't construct result wrapper from JS ResultWrapper object", + "Error calling getRootResultObject() method of JS ResultWrapper object", )?; + let result_data_js_vec = result_data_js_array + .downcast::(cx) + .map_cube_err("Can't downcast getRootResultObject result to array")? + .to_vec(cx) + .map_cube_err("Can't convert getRootResultObject result to array")?; + + let mut native_wrapped_results = Vec::new(); + for (js_wrapper, js_result_data) in js_res_wrapped_vec.iter().zip(result_data_js_vec.iter()) { + let mut wrapper = ResultWrapper::from_js_result_wrapper(cx, *js_wrapper) + .map_cube_err("Can't construct result wrapper from JS ResultWrapper object")?; + + let deserializer = JsValueDeserializer::new(cx, *js_result_data); + let result_data: RequestResultData = Deserialize::deserialize(deserializer) + .map_cube_err("Can't deserialize RequestResultData from getRootResultObject")?; + + wrapper.last_refresh_time = result_data.last_refresh_time; + + native_wrapped_results.push(wrapper); + } + Ok(ValueFromJs::ResultWrapper(native_wrapped_results)) } else if let Ok(str) = v.downcast::(cx) { Ok(ValueFromJs::String(str.value(cx))) @@ -475,14 +501,33 @@ impl TransportService for NodeBridgeTransport { } }; - break convert_transport_response(response, schema.clone(), member_fields) - .map_err(|err| CubeError::user(err.to_string())); + break convert_transport_response(response, schema.clone(), member_fields); } ValueFromJs::ResultWrapper(result_wrappers) => { break result_wrappers .into_iter() .map(|mut wrapper| { - transform_response(&mut wrapper, schema.clone(), &member_fields) + let updated_schema = if let Some(last_refresh_time) = + wrapper.last_refresh_time.clone() + { + let mut metadata = schema.metadata().clone(); + metadata.insert("lastRefreshTime".to_string(), last_refresh_time); + Arc::new(Schema::new_with_metadata( + schema.fields().to_vec(), + metadata, + )) + } else { + schema.clone() + }; + + Ok(TransportServiceLoadResponse { + last_refresh_time: wrapper.last_refresh_time.clone(), + results_batch: transform_response( + &mut wrapper, + updated_schema, + &member_fields, + )?, + }) }) .collect::, _>>(); } diff --git a/rust/cubesql/cubesql/src/compile/engine/df/scan.rs b/rust/cubesql/cubesql/src/compile/engine/df/scan.rs index 627f51573e13f..8ebd82689801a 100644 --- a/rust/cubesql/cubesql/src/compile/engine/df/scan.rs +++ b/rust/cubesql/cubesql/src/compile/engine/df/scan.rs @@ -1,4 +1,5 @@ use crate::compile::date_parser::parse_date_str; +use crate::transport::TransportServiceLoadResponse; use crate::{ compile::{ engine::df::wrapper::{CubeScanWrappedSqlNode, CubeScanWrapperNode, SqlQuery}, @@ -18,7 +19,7 @@ pub use datafusion::{ ArrayRef, BooleanBuilder, Date32Builder, DecimalBuilder, Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder, NullArray, StringBuilder, }, - datatypes::{DataType, SchemaRef}, + datatypes::{DataType, Schema, SchemaRef}, error::{ArrowError, Result as ArrowResult}, record_batch::RecordBatch, }, @@ -508,12 +509,13 @@ impl ExecutionPlan for CubeScanExecutionPlan { // For now execute method executes only one query at a time, so we // take the first result + let rb_schema = response.first().unwrap().schema().clone(); one_shot_stream.data = Some(response.first().unwrap().clone()); Ok(Box::pin(CubeScanStreamRouter::new( None, one_shot_stream, - self.schema.clone(), + rb_schema, ))) } @@ -734,6 +736,7 @@ async fn load_data( })?; let response = result.first(); if let Some(data) = response.cloned() { + let data = data.results_batch; match (options.max_records, data.num_rows()) { (Some(max_records), len) if len >= max_records => { return Err(ArrowError::ExternalError(Box::new(CubeError::user( @@ -1171,15 +1174,18 @@ pub fn convert_transport_response( response: V1LoadResponse, schema: SchemaRef, member_fields: Vec, -) -> std::result::Result, CubeError> { +) -> std::result::Result, CubeError> { response .results .into_iter() .map(|r| { let mut response = JsonValueObject::new(r.data.clone()); - transform_response(&mut response, schema.clone(), &member_fields) + Ok(TransportServiceLoadResponse { + last_refresh_time: r.last_refresh_time, + results_batch: transform_response(&mut response, schema.clone(), &member_fields)?, + }) }) - .collect::, CubeError>>() + .collect::, CubeError>>() } #[cfg(test)] @@ -1248,7 +1254,7 @@ mod tests { schema: SchemaRef, member_fields: Vec, _cache_mode: Option, - ) -> Result, CubeError> { + ) -> Result, CubeError> { let response = r#" { "results": [{ diff --git a/rust/cubesql/cubesql/src/compile/test/mod.rs b/rust/cubesql/cubesql/src/compile/test/mod.rs index 7efc5fdb0ab94..ed826048086a6 100644 --- a/rust/cubesql/cubesql/src/compile/test/mod.rs +++ b/rust/cubesql/cubesql/src/compile/test/mod.rs @@ -47,10 +47,9 @@ pub mod test_user_change; #[cfg(test)] pub mod test_wrapper; pub mod utils; +use crate::compile::engine::df::scan::convert_transport_response; use crate::compile::engine::df::scan::CacheMode; -use crate::compile::{ - arrow::record_batch::RecordBatch, engine::df::scan::convert_transport_response, -}; +use crate::transport::TransportServiceLoadResponse; pub use utils::*; pub fn get_test_meta() -> Vec { @@ -913,7 +912,7 @@ impl TransportService for TestConnectionTransport { schema: SchemaRef, member_fields: Vec, _cache_mode: Option, - ) -> Result, CubeError> { + ) -> Result, CubeError> { { let mut calls = self.load_calls.lock().await; calls.push(TestTransportLoadCall { diff --git a/rust/cubesql/cubesql/src/transport/service.rs b/rust/cubesql/cubesql/src/transport/service.rs index e6e5e32591f7d..7c9d815a86d81 100644 --- a/rust/cubesql/cubesql/src/transport/service.rs +++ b/rust/cubesql/cubesql/src/transport/service.rs @@ -113,6 +113,12 @@ impl SpanId { } } +#[derive(Clone)] +pub struct TransportServiceLoadResponse { + pub last_refresh_time: Option, + pub results_batch: RecordBatch, +} + #[async_trait] pub trait TransportService: Send + Sync + Debug { // Load meta information about cubes @@ -145,7 +151,7 @@ pub trait TransportService: Send + Sync + Debug { schema: SchemaRef, member_fields: Vec, cache_mode: Option, - ) -> Result, CubeError>; + ) -> Result, CubeError>; async fn load_stream( &self, @@ -193,11 +199,11 @@ struct MetaCacheBucket { value: Arc, } -/// This transports is used in standalone mode +/// This transport is used in standalone mode #[derive(Debug)] pub struct HttpTransport { /// We use simple cache to improve DX with standalone mode - /// because currently we dont persist DF in the SessionState + /// because currently we don't persist DF in the SessionState, /// and it causes a lot of HTTP requests which slow down BI connections cache: RwLockAsync>, } @@ -286,7 +292,7 @@ impl TransportService for HttpTransport { schema: SchemaRef, member_fields: Vec, cache_mode: Option, - ) -> Result, CubeError> { + ) -> Result, CubeError> { if meta.change_user().is_some() { return Err(CubeError::internal( "Changing security context (__user) is not supported in the standalone mode"