Skip to content

Commit 8ffc299

Browse files
committed
Merge remote-tracking branch 'upstream/main' into feat/metrics
2 parents 7998c3a + 38068c4 commit 8ffc299

File tree

46 files changed

+1905
-412
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1905
-412
lines changed

acceptance/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ tar = "0.4"
3636
[dev-dependencies]
3737
datatest-stable = "0.3"
3838
test-log = { version = "0.2", default-features = false, features = ["trace"] }
39+
test_utils = { path = "../test-utils" }
3940
tempfile = "3"
4041
test-case = { version = "3.3.1" }
4142
tokio = { version = "1.47" }

acceptance/tests/dat_reader.rs

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
use std::path::Path;
2-
use std::sync::Arc;
32

43
use acceptance::read_dat_case;
5-
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
6-
use delta_kernel::engine::default::DefaultEngine;
74

85
// TODO(zach): skip iceberg_compat_v1 test until DAT is fixed
96
static SKIPPED_TESTS: &[&str; 1] = &["iceberg_compat_v1"];
@@ -27,14 +24,7 @@ fn reader_test(path: &Path) -> datatest_stable::Result<()> {
2724
.block_on(async {
2825
let case = read_dat_case(root_dir).unwrap();
2926
let table_root = case.table_root().unwrap();
30-
let engine = Arc::new(
31-
DefaultEngine::try_new(
32-
&table_root,
33-
std::iter::empty::<(&str, &str)>(),
34-
Arc::new(TokioBackgroundExecutor::new()),
35-
)
36-
.unwrap(),
37-
);
27+
let engine = test_utils::create_default_engine(&table_root).unwrap();
3828

3929
case.assert_metadata(engine.clone()).await.unwrap();
4030
acceptance::data::assert_scan_metadata(engine.clone(), &case)

acceptance/tests/other.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,7 @@ async fn test_read_table_with_checkpoint() {
3838
))
3939
.unwrap();
4040
let location = url::Url::from_directory_path(path).unwrap();
41-
let engine = Arc::new(
42-
DefaultEngine::try_new(&location, HashMap::<String, String>::new()).unwrap(),
43-
);
41+
let engine = test_utils::create_default_engine(&location).unwrap();
4442
let snapshot = Snapshot::try_new(location, engine, None)
4543
.await
4644
.unwrap();

ffi/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ object_store = "0.12.3"
4646
default = ["default-engine-rustls"]
4747
default-engine-native-tls = ["delta_kernel/default-engine-native-tls", "default-engine-base"]
4848
default-engine-rustls = ["delta_kernel/default-engine-rustls", "default-engine-base"]
49+
catalog-managed = ["delta_kernel/catalog-managed"]
4950

5051
# This is an 'internal' feature flag which has all the shared bits from default-engine-native-tls and
5152
# default-engine-rustls. There is a check in kernel/lib.rs to ensure you have enabled one of

ffi/src/domain_metadata.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ mod tests {
9393
recover_string,
9494
};
9595
use crate::{engine_to_handle, free_engine, free_snapshot, kernel_string_slice, snapshot};
96-
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
9796
use delta_kernel::engine::default::DefaultEngine;
9897
use delta_kernel::DeltaResult;
9998
use object_store::memory::InMemory;
@@ -107,7 +106,7 @@ mod tests {
107106
async fn test_domain_metadata() -> DeltaResult<()> {
108107
let storage = Arc::new(InMemory::new());
109108

110-
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));
109+
let engine = DefaultEngine::new(storage.clone());
111110
let engine = engine_to_handle(Arc::new(engine), allocate_err);
112111
let path = "memory:///";
113112

ffi/src/lib.rs

Lines changed: 139 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@ use url::Url;
1313

1414
use delta_kernel::schema::Schema;
1515
use delta_kernel::snapshot::Snapshot;
16-
use delta_kernel::Version;
17-
use delta_kernel::{DeltaResult, Engine, EngineData};
16+
use delta_kernel::{DeltaResult, Engine, EngineData, LogPath, Version};
1817
use delta_kernel_ffi_macros::handle_descriptor;
1918

2019
// cbindgen doesn't understand our use of feature flags here, and by default it parses `mod handle`
@@ -43,6 +42,8 @@ use error::{AllocateError, AllocateErrorFn, ExternResult, IntoExternResult};
4342
pub mod expressions;
4443
#[cfg(feature = "tracing")]
4544
pub mod ffi_tracing;
45+
#[cfg(feature = "catalog-managed")]
46+
pub mod log_path;
4647
pub mod scan;
4748
pub mod schema;
4849

@@ -560,13 +561,11 @@ fn get_default_engine_impl(
560561
allocate_error: AllocateErrorFn,
561562
) -> DeltaResult<Handle<SharedExternEngine>> {
562563
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
564+
use delta_kernel::engine::default::storage::store_from_url_opts;
563565
use delta_kernel::engine::default::DefaultEngine;
564-
let engine = DefaultEngine::<TokioBackgroundExecutor>::try_new(
565-
&url,
566-
options,
567-
Arc::new(TokioBackgroundExecutor::new()),
568-
);
569-
Ok(engine_to_handle(Arc::new(engine?), allocate_error))
566+
let store = store_from_url_opts(&url, options)?;
567+
let engine = DefaultEngine::<TokioBackgroundExecutor>::new(store);
568+
Ok(engine_to_handle(Arc::new(engine), allocate_error))
570569
}
571570

572571
/// # Safety
@@ -596,10 +595,36 @@ pub unsafe extern "C" fn snapshot(
596595
) -> ExternResult<Handle<SharedSnapshot>> {
597596
let url = unsafe { unwrap_and_parse_path_as_url(path) };
598597
let engine = unsafe { engine.as_ref() };
599-
snapshot_impl(url, engine, None).into_extern_result(&engine)
598+
snapshot_impl(url, engine, None, Vec::new()).into_extern_result(&engine)
599+
}
600+
601+
/// Get the latest snapshot from the specified table with optional log tail
602+
///
603+
/// # Safety
604+
///
605+
/// Caller is responsible for passing valid handles and path pointer.
606+
/// The log_paths array and its contents must remain valid for the duration of this call.
607+
#[cfg(feature = "catalog-managed")]
608+
#[no_mangle]
609+
pub unsafe extern "C" fn snapshot_with_log_tail(
610+
path: KernelStringSlice,
611+
engine: Handle<SharedExternEngine>,
612+
log_paths: log_path::LogPathArray,
613+
) -> ExternResult<Handle<SharedSnapshot>> {
614+
let url = unsafe { unwrap_and_parse_path_as_url(path) };
615+
let engine_ref = unsafe { engine.as_ref() };
616+
617+
// Convert LogPathArray to Vec<LogPath>
618+
let log_tail = match unsafe { log_paths.log_paths() } {
619+
Ok(paths) => paths,
620+
Err(err) => return Err(err).into_extern_result(&engine_ref),
621+
};
622+
623+
snapshot_impl(url, engine_ref, None, log_tail).into_extern_result(&engine_ref)
600624
}
601625

602-
/// Get the snapshot from the specified table at a specific version
626+
/// Get the snapshot from the specified table at a specific version. Note this is only safe for
627+
/// non-catalog-managed tables.
603628
///
604629
/// # Safety
605630
///
@@ -612,21 +637,52 @@ pub unsafe extern "C" fn snapshot_at_version(
612637
) -> ExternResult<Handle<SharedSnapshot>> {
613638
let url = unsafe { unwrap_and_parse_path_as_url(path) };
614639
let engine = unsafe { engine.as_ref() };
615-
snapshot_impl(url, engine, version.into()).into_extern_result(&engine)
640+
snapshot_impl(url, engine, version.into(), Vec::new()).into_extern_result(&engine)
641+
}
642+
643+
/// Get the snapshot from the specified table at a specific version with log tail.
644+
///
645+
/// # Safety
646+
///
647+
/// Caller is responsible for passing valid handles and path pointer.
648+
/// The log_tail array and its contents must remain valid for the duration of this call.
649+
#[cfg(feature = "catalog-managed")]
650+
#[no_mangle]
651+
pub unsafe extern "C" fn snapshot_at_version_with_log_tail(
652+
path: KernelStringSlice,
653+
engine: Handle<SharedExternEngine>,
654+
version: Version,
655+
log_tail: log_path::LogPathArray,
656+
) -> ExternResult<Handle<SharedSnapshot>> {
657+
let url = unsafe { unwrap_and_parse_path_as_url(path) };
658+
let engine_ref = unsafe { engine.as_ref() };
659+
660+
// Convert LogPathArray to Vec<LogPath>
661+
let log_tail = match unsafe { log_tail.log_paths() } {
662+
Ok(paths) => paths,
663+
Err(err) => return Err(err).into_extern_result(&engine_ref),
664+
};
665+
666+
snapshot_impl(url, engine_ref, version.into(), log_tail).into_extern_result(&engine_ref)
616667
}
617668

618669
fn snapshot_impl(
619670
url: DeltaResult<Url>,
620671
extern_engine: &dyn ExternEngine,
621672
version: Option<Version>,
673+
#[allow(unused_variables)] log_tail: Vec<LogPath>,
622674
) -> DeltaResult<Handle<SharedSnapshot>> {
623-
let builder = Snapshot::builder_for(url?);
624-
let builder = if let Some(v) = version {
625-
// TODO: should we include a `with_version_opt` method for the builder?
626-
builder.at_version(v)
627-
} else {
628-
builder
629-
};
675+
let mut builder = Snapshot::builder_for(url?);
676+
677+
if let Some(v) = version {
678+
builder = builder.at_version(v);
679+
}
680+
681+
#[cfg(feature = "catalog-managed")]
682+
if !log_tail.is_empty() {
683+
builder = builder.with_log_tail(log_tail);
684+
}
685+
630686
let snapshot = builder.build(extern_engine.engine().as_ref())?;
631687
Ok(snapshot.into())
632688
}
@@ -823,7 +879,7 @@ mod tests {
823879
allocate_err, allocate_str, assert_extern_result_error_with_message, ok_or_panic,
824880
recover_string,
825881
};
826-
use delta_kernel::engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine};
882+
use delta_kernel::engine::default::DefaultEngine;
827883
use object_store::memory::InMemory;
828884
use test_utils::{actions_to_string, actions_to_string_partitioned, add_commit, TestAction};
829885

@@ -870,7 +926,7 @@ mod tests {
870926
actions_to_string(vec![TestAction::Metadata]),
871927
)
872928
.await?;
873-
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));
929+
let engine = DefaultEngine::new(storage.clone());
874930
let engine = engine_to_handle(Arc::new(engine), allocate_err);
875931
let path = "memory:///";
876932

@@ -916,7 +972,7 @@ mod tests {
916972
actions_to_string_partitioned(vec![TestAction::Metadata]),
917973
)
918974
.await?;
919-
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));
975+
let engine = DefaultEngine::new(storage.clone());
920976
let engine = engine_to_handle(Arc::new(engine), allocate_err);
921977
let path = "memory:///";
922978

@@ -952,7 +1008,7 @@ mod tests {
9521008
actions_to_string(vec![TestAction::Metadata]),
9531009
)
9541010
.await?;
955-
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));
1011+
let engine = DefaultEngine::new(storage.clone());
9561012
let engine = engine_to_handle(Arc::new(engine), allocate_null_err);
9571013
let path = "memory:///";
9581014

@@ -964,4 +1020,65 @@ mod tests {
9641020
unsafe { free_engine(engine) }
9651021
Ok(())
9661022
}
1023+
1024+
#[cfg(feature = "catalog-managed")]
1025+
#[tokio::test]
1026+
async fn test_snapshot_log_tail() -> Result<(), Box<dyn std::error::Error>> {
1027+
use test_utils::add_staged_commit;
1028+
let storage = Arc::new(InMemory::new());
1029+
add_commit(
1030+
storage.as_ref(),
1031+
0,
1032+
actions_to_string(vec![TestAction::Metadata]),
1033+
)
1034+
.await?;
1035+
let commit1 = add_staged_commit(
1036+
storage.as_ref(),
1037+
1,
1038+
actions_to_string(vec![TestAction::Add("path1".into())]),
1039+
)
1040+
.await?;
1041+
let engine = DefaultEngine::new(storage.clone());
1042+
let engine = engine_to_handle(Arc::new(engine), allocate_err);
1043+
let path = "memory:///";
1044+
1045+
let commit1_path = format!(
1046+
"{}_delta_log/_staged_commits/{}",
1047+
path,
1048+
commit1.filename().unwrap()
1049+
);
1050+
let log_path =
1051+
log_path::FfiLogPath::new(kernel_string_slice!(commit1_path), 123456789, 100);
1052+
let log_tail = [log_path];
1053+
let log_tail = log_path::LogPathArray {
1054+
ptr: log_tail.as_ptr(),
1055+
len: log_tail.len(),
1056+
};
1057+
let snapshot = unsafe {
1058+
ok_or_panic(snapshot_with_log_tail(
1059+
kernel_string_slice!(path),
1060+
engine.shallow_copy(),
1061+
log_tail.clone(),
1062+
))
1063+
};
1064+
let snapshot_version = unsafe { version(snapshot.shallow_copy()) };
1065+
assert_eq!(snapshot_version, 1);
1066+
1067+
// Test getting snapshot at version
1068+
let snapshot2 = unsafe {
1069+
ok_or_panic(snapshot_at_version_with_log_tail(
1070+
kernel_string_slice!(path),
1071+
engine.shallow_copy(),
1072+
1,
1073+
log_tail,
1074+
))
1075+
};
1076+
let snapshot_version = unsafe { version(snapshot.shallow_copy()) };
1077+
assert_eq!(snapshot_version, 1);
1078+
1079+
unsafe { free_snapshot(snapshot) }
1080+
unsafe { free_snapshot(snapshot2) }
1081+
unsafe { free_engine(engine) }
1082+
Ok(())
1083+
}
9671084
}

0 commit comments

Comments
 (0)