Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/cubejs-api-gateway/openspec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,8 @@ components:
type: "array"
items:
type: "object"
lastRefreshTime:
type: "string"
V1Error:
type: "object"
required:
Expand Down
7 changes: 7 additions & 0 deletions packages/cubejs-backend-native/src/node_export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,13 @@ async fn handle_sql_query(
let mut schema_response = Map::new();
schema_response.insert("schema".into(), columns_json);

if let Some(last_refresh_time) = stream_schema.metadata().get("lastRefreshTime") {
schema_response.insert(
"lastRefreshTime".into(),
serde_json::Value::String(last_refresh_time.clone()),
);
}

write_jsonl_message(
channel.clone(),
stream_methods.write.clone(),
Expand Down
2 changes: 2 additions & 0 deletions packages/cubejs-backend-native/src/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub struct ResultWrapper {
transform_data: TransformDataRequest,
data: Arc<QueryResult>,
transformed_data: Option<TransformedData>,
pub last_refresh_time: Option<String>,
}

impl ResultWrapper {
Expand Down Expand Up @@ -114,6 +115,7 @@ impl ResultWrapper {
transform_data: transform_request,
data: query_result,
transformed_data: None,
last_refresh_time: None,
})
}

Expand Down
69 changes: 57 additions & 12 deletions packages/cubejs-backend-native/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,31 @@ use std::fmt::Display;

use crate::auth::NativeSQLAuthContext;
use crate::channel::{call_raw_js_with_channel_as_callback, NodeSqlGenerator, ValueFromJs};
use crate::node_obj_deserializer::JsValueDeserializer;
use crate::node_obj_serializer::NodeObjSerializer;
use crate::orchestrator::ResultWrapper;
use crate::{
auth::TransportRequest, channel::call_js_with_channel_as_callback,
stream::call_js_with_stream_as_callback,
};
use async_trait::async_trait;
use cubeorchestrator::query_result_transform::RequestResultData;
use cubesql::compile::arrow::datatypes::Schema;
use cubesql::compile::engine::df::scan::{
convert_transport_response, transform_response, CacheMode, MemberField, RecordBatch, SchemaRef,
convert_transport_response, transform_response, CacheMode, MemberField, SchemaRef,
};
use cubesql::compile::engine::df::wrapper::SqlQuery;
use cubesql::transport::{
SpanId, SqlGenerator, SqlResponse, TransportLoadRequestQuery, TransportLoadResponse,
TransportMetaResponse,
TransportMetaResponse, TransportServiceLoadResponse,
};
use cubesql::{
di_service,
sql::AuthContextRef,
transport::{CubeStreamReceiver, LoadRequestMeta, MetaContext, TransportService},
CubeError,
};
use serde::Serialize;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use uuid::Uuid;

Expand Down Expand Up @@ -342,7 +345,7 @@ impl TransportService for NodeBridgeTransport {
schema: SchemaRef,
member_fields: Vec<MemberField>,
cache_mode: Option<CacheMode>,
) -> Result<Vec<RecordBatch>, CubeError> {
) -> Result<Vec<TransportServiceLoadResponse>, CubeError> {
trace!("[transport] Request ->");

let native_auth = ctx
Expand Down Expand Up @@ -402,14 +405,37 @@ impl TransportService for NodeBridgeTransport {
.to_vec(cx)
.map_cube_err("Can't convert JS result to array")?;

let native_wrapped_results = js_res_wrapped_vec
.iter()
.map(|r| ResultWrapper::from_js_result_wrapper(cx, *r))
.collect::<Result<Vec<_>, _>>()
let get_root_result_object_method: Handle<JsFunction> =
js_result_wrapped.get(cx, "getRootResultObject").map_cube_err(
"Can't get getRootResultObject method from JS ResultWrapper object",
)?;

let result_data_js_array = get_root_result_object_method
.call(cx, js_result_wrapped.upcast::<JsValue>(), [])
.map_cube_err(
"Can't construct result wrapper from JS ResultWrapper object",
"Error calling getRootResultObject() method of JS ResultWrapper object",
)?;

let result_data_js_vec = result_data_js_array
.downcast::<JsArray, _>(cx)
.map_cube_err("Can't downcast getRootResultObject result to array")?
.to_vec(cx)
.map_cube_err("Can't convert getRootResultObject result to array")?;

let mut native_wrapped_results = Vec::new();
for (js_wrapper, js_result_data) in js_res_wrapped_vec.iter().zip(result_data_js_vec.iter()) {
let mut wrapper = ResultWrapper::from_js_result_wrapper(cx, *js_wrapper)
.map_cube_err("Can't construct result wrapper from JS ResultWrapper object")?;

let deserializer = JsValueDeserializer::new(cx, *js_result_data);
let result_data: RequestResultData = Deserialize::deserialize(deserializer)
.map_cube_err("Can't deserialize RequestResultData from getRootResultObject")?;

wrapper.last_refresh_time = result_data.last_refresh_time;

native_wrapped_results.push(wrapper);
}

Ok(ValueFromJs::ResultWrapper(native_wrapped_results))
} else if let Ok(str) = v.downcast::<JsString, _>(cx) {
Ok(ValueFromJs::String(str.value(cx)))
Expand Down Expand Up @@ -475,14 +501,33 @@ impl TransportService for NodeBridgeTransport {
}
};

break convert_transport_response(response, schema.clone(), member_fields)
.map_err(|err| CubeError::user(err.to_string()));
break convert_transport_response(response, schema.clone(), member_fields);
}
ValueFromJs::ResultWrapper(result_wrappers) => {
break result_wrappers
.into_iter()
.map(|mut wrapper| {
transform_response(&mut wrapper, schema.clone(), &member_fields)
let updated_schema = if let Some(last_refresh_time) =
wrapper.last_refresh_time.clone()
{
let mut metadata = schema.metadata().clone();
metadata.insert("lastRefreshTime".to_string(), last_refresh_time);
Arc::new(Schema::new_with_metadata(
schema.fields().to_vec(),
metadata,
))
} else {
schema.clone()
};

Ok(TransportServiceLoadResponse {
last_refresh_time: wrapper.last_refresh_time.clone(),
results_batch: transform_response(
&mut wrapper,
updated_schema,
&member_fields,
)?,
})
})
.collect::<Result<Vec<_>, _>>();
}
Expand Down
3 changes: 3 additions & 0 deletions rust/cubesql/cubeclient/src/models/v1_load_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub struct V1LoadResult {
pub data: Vec<serde_json::Value>,
#[serde(rename = "refreshKeyValues", skip_serializing_if = "Option::is_none")]
pub refresh_key_values: Option<Vec<serde_json::Value>>,
#[serde(rename = "lastRefreshTime", skip_serializing_if = "Option::is_none")]
pub last_refresh_time: Option<String>,
}

impl V1LoadResult {
Expand All @@ -33,6 +35,7 @@ impl V1LoadResult {
annotation: Box::new(annotation),
data,
refresh_key_values: None,
last_refresh_time: None,
}
}
}
18 changes: 12 additions & 6 deletions rust/cubesql/cubesql/src/compile/engine/df/scan.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::compile::date_parser::parse_date_str;
use crate::transport::TransportServiceLoadResponse;
use crate::{
compile::{
engine::df::wrapper::{CubeScanWrappedSqlNode, CubeScanWrapperNode, SqlQuery},
Expand All @@ -18,7 +19,7 @@ pub use datafusion::{
ArrayRef, BooleanBuilder, Date32Builder, DecimalBuilder, Float32Builder,
Float64Builder, Int16Builder, Int32Builder, Int64Builder, NullArray, StringBuilder,
},
datatypes::{DataType, SchemaRef},
datatypes::{DataType, Schema, SchemaRef},
error::{ArrowError, Result as ArrowResult},
record_batch::RecordBatch,
},
Expand Down Expand Up @@ -508,12 +509,13 @@ impl ExecutionPlan for CubeScanExecutionPlan {

// For now execute method executes only one query at a time, so we
// take the first result
let rb_schema = response.first().unwrap().schema().clone();
one_shot_stream.data = Some(response.first().unwrap().clone());

Ok(Box::pin(CubeScanStreamRouter::new(
None,
one_shot_stream,
self.schema.clone(),
rb_schema,
)))
}

Expand Down Expand Up @@ -734,6 +736,7 @@ async fn load_data(
})?;
let response = result.first();
if let Some(data) = response.cloned() {
let data = data.results_batch;
match (options.max_records, data.num_rows()) {
(Some(max_records), len) if len >= max_records => {
return Err(ArrowError::ExternalError(Box::new(CubeError::user(
Expand Down Expand Up @@ -1171,15 +1174,18 @@ pub fn convert_transport_response(
response: V1LoadResponse,
schema: SchemaRef,
member_fields: Vec<MemberField>,
) -> std::result::Result<Vec<RecordBatch>, CubeError> {
) -> std::result::Result<Vec<TransportServiceLoadResponse>, CubeError> {
response
.results
.into_iter()
.map(|r| {
let mut response = JsonValueObject::new(r.data.clone());
transform_response(&mut response, schema.clone(), &member_fields)
Ok(TransportServiceLoadResponse {
last_refresh_time: r.last_refresh_time,
results_batch: transform_response(&mut response, schema.clone(), &member_fields)?,
})
})
.collect::<std::result::Result<Vec<RecordBatch>, CubeError>>()
.collect::<std::result::Result<Vec<TransportServiceLoadResponse>, CubeError>>()
}

#[cfg(test)]
Expand Down Expand Up @@ -1248,7 +1254,7 @@ mod tests {
schema: SchemaRef,
member_fields: Vec<MemberField>,
_cache_mode: Option<CacheMode>,
) -> Result<Vec<RecordBatch>, CubeError> {
) -> Result<Vec<TransportServiceLoadResponse>, CubeError> {
let response = r#"
{
"results": [{
Expand Down
7 changes: 3 additions & 4 deletions rust/cubesql/cubesql/src/compile/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,9 @@ pub mod test_user_change;
#[cfg(test)]
pub mod test_wrapper;
pub mod utils;
use crate::compile::engine::df::scan::convert_transport_response;
use crate::compile::engine::df::scan::CacheMode;
use crate::compile::{
arrow::record_batch::RecordBatch, engine::df::scan::convert_transport_response,
};
use crate::transport::TransportServiceLoadResponse;
pub use utils::*;

pub fn get_test_meta() -> Vec<CubeMeta> {
Expand Down Expand Up @@ -913,7 +912,7 @@ impl TransportService for TestConnectionTransport {
schema: SchemaRef,
member_fields: Vec<MemberField>,
_cache_mode: Option<CacheMode>,
) -> Result<Vec<RecordBatch>, CubeError> {
) -> Result<Vec<TransportServiceLoadResponse>, CubeError> {
{
let mut calls = self.load_calls.lock().await;
calls.push(TestTransportLoadCall {
Expand Down
14 changes: 10 additions & 4 deletions rust/cubesql/cubesql/src/transport/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ impl SpanId {
}
}

#[derive(Clone)]
pub struct TransportServiceLoadResponse {
pub last_refresh_time: Option<String>,
pub results_batch: RecordBatch,
}

#[async_trait]
pub trait TransportService: Send + Sync + Debug {
// Load meta information about cubes
Expand Down Expand Up @@ -145,7 +151,7 @@ pub trait TransportService: Send + Sync + Debug {
schema: SchemaRef,
member_fields: Vec<MemberField>,
cache_mode: Option<CacheMode>,
) -> Result<Vec<RecordBatch>, CubeError>;
) -> Result<Vec<TransportServiceLoadResponse>, CubeError>;

async fn load_stream(
&self,
Expand Down Expand Up @@ -193,11 +199,11 @@ struct MetaCacheBucket {
value: Arc<MetaContext>,
}

/// This transports is used in standalone mode
/// This transport is used in standalone mode
#[derive(Debug)]
pub struct HttpTransport {
/// We use simple cache to improve DX with standalone mode
/// because currently we dont persist DF in the SessionState
/// because currently we don't persist DF in the SessionState,
/// and it causes a lot of HTTP requests which slow down BI connections
cache: RwLockAsync<Option<MetaCacheBucket>>,
}
Expand Down Expand Up @@ -286,7 +292,7 @@ impl TransportService for HttpTransport {
schema: SchemaRef,
member_fields: Vec<MemberField>,
cache_mode: Option<CacheMode>,
) -> Result<Vec<RecordBatch>, CubeError> {
) -> Result<Vec<TransportServiceLoadResponse>, CubeError> {
if meta.change_user().is_some() {
return Err(CubeError::internal(
"Changing security context (__user) is not supported in the standalone mode"
Expand Down
Loading