Skip to content

Commit 4442deb

Browse files
samansminknicklan
andauthored
feat: allow visiting entire domain metadata (#1384)
## What changes are proposed in this pull request? Adding the ability to scan the entire domain metadata. This PR is in a way a follow up to #1342. I wanted to use the `visit_string_map` function from that PR, but then we would run into some ownership/lifetime issues: we would need to return a pointer to a kernel-allocated map I thought the nicest solution would be to just create a separate visitor function which avoids any lifetime issues by scoping things to the visitor callback. I also considered: ```rust pub unsafe extern "C" fn visit_domain_metadata( snapshot: Handle<SharedSnapshot>, engine: Handle<SharedExternEngine>, engine_context: NullableCvoid, visitor: extern "C" fn( engine_context: NullableCvoid, map: &CStringMap, ), ) ``` Which would allow reusing the `visit_string_map` function and make this more in line with how engine is expected to consume other string maps, but I don't think the extra level of indirection really adds much and the visitor is very simple anyway. ### This PR affects the following public APIs - Added new `Snapshot::get_all_domain_metadata` function - Added new ffi `visit_domain_metadata` to go over all domain metadata ## How was this change tested? - Testing code added to existing `kernel::snapshot::tests::test_domain_metadata` test - Testing code added to existing `ffi::domain_metadata::tests::test_domain_metadata` test --------- Co-authored-by: Nick Lanham <nicklan@users.noreply.github.com>
1 parent 769157f commit 4442deb

File tree

4 files changed

+143
-4
lines changed

4 files changed

+143
-4
lines changed

ffi/src/domain_metadata.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,53 @@ fn get_domain_metadata_impl(
3737
.and_then(|config: String| allocate_fn(kernel_string_slice!(config))))
3838
}
3939

40+
/// Get the domain metadata as an optional string allocated by `AllocatedStringFn` for a specific domain in this snapshot
41+
///
42+
/// # Safety
43+
///
44+
/// Caller is responsible for passing in a valid handle
45+
#[no_mangle]
46+
pub unsafe extern "C" fn visit_domain_metadata(
47+
snapshot: Handle<SharedSnapshot>,
48+
engine: Handle<SharedExternEngine>,
49+
engine_context: NullableCvoid,
50+
visitor: extern "C" fn(
51+
engine_context: NullableCvoid,
52+
domain: KernelStringSlice,
53+
configuration: KernelStringSlice,
54+
),
55+
) -> ExternResult<bool> {
56+
let snapshot = unsafe { snapshot.as_ref() };
57+
let engine = unsafe { engine.as_ref() };
58+
59+
visit_domain_metadata_impl(snapshot, engine, engine_context, visitor)
60+
.into_extern_result(&engine)
61+
}
62+
63+
fn visit_domain_metadata_impl(
64+
snapshot: &Snapshot,
65+
extern_engine: &dyn ExternEngine,
66+
engine_context: NullableCvoid,
67+
visitor: extern "C" fn(
68+
engine_context: NullableCvoid,
69+
key: KernelStringSlice,
70+
value: KernelStringSlice,
71+
),
72+
) -> DeltaResult<bool> {
73+
let res = snapshot.get_all_domain_metadata(extern_engine.engine().as_ref())?;
74+
res.iter().for_each(|metadata| {
75+
let domain = &metadata.domain();
76+
let configuration = &metadata.configuration();
77+
visitor(
78+
engine_context,
79+
kernel_string_slice!(domain),
80+
kernel_string_slice!(configuration),
81+
);
82+
});
83+
84+
Ok(true)
85+
}
86+
4087
#[cfg(test)]
4188
mod tests {
4289
use super::*;
@@ -51,6 +98,8 @@ mod tests {
5198
use delta_kernel::DeltaResult;
5299
use object_store::memory::InMemory;
53100
use serde_json::json;
101+
use std::collections::HashMap;
102+
use std::ptr::NonNull;
54103
use std::sync::Arc;
55104
use test_utils::add_commit;
56105

@@ -148,6 +197,8 @@ mod tests {
148197
)
149198
};
150199

200+
// First, we test fetching the domain metadata one-by-one
201+
151202
let domain1 = "domain1";
152203
let res = ok_or_panic(get_domain_metadata_helper(domain1));
153204
assert!(res.is_none());
@@ -160,6 +211,49 @@ mod tests {
160211
let res = get_domain_metadata_helper(domain3);
161212
assert_extern_result_error_with_message(res, KernelError::GenericError, "Generic delta kernel error: User DomainMetadata are not allowed to use system-controlled 'delta.*' domain");
162213

214+
// Secondly, we visit the entire domain metadata
215+
216+
// Create visitor state
217+
let visitor_state: Box<HashMap<String, String>> = Box::default();
218+
let visitor_state_ptr = Box::into_raw(visitor_state);
219+
220+
// Test visitor function
221+
extern "C" fn visitor(
222+
state: NullableCvoid,
223+
key: KernelStringSlice,
224+
value: KernelStringSlice,
225+
) {
226+
let mut collected_metadata = unsafe {
227+
Box::from_raw(
228+
state.unwrap().as_ptr() as *mut std::collections::HashMap<String, String>
229+
)
230+
};
231+
let key: DeltaResult<String> = unsafe { TryFromStringSlice::try_from_slice(&key) };
232+
let value: DeltaResult<String> = unsafe { TryFromStringSlice::try_from_slice(&value) };
233+
collected_metadata.insert(key.unwrap(), value.unwrap());
234+
Box::leak(collected_metadata);
235+
}
236+
237+
// Visit all (user) domain metadata
238+
let res = unsafe {
239+
ok_or_panic(visit_domain_metadata(
240+
snapshot.shallow_copy(),
241+
engine.shallow_copy(),
242+
Some(NonNull::new_unchecked(visitor_state_ptr).cast()),
243+
visitor,
244+
))
245+
};
246+
247+
// Confirm visitor picked up all entries in map
248+
let collected_metadata = unsafe { Box::from_raw(visitor_state_ptr) };
249+
assert!(res);
250+
assert!(collected_metadata.get("domain1").is_none());
251+
assert!(collected_metadata.get("delta.domain3").is_none());
252+
assert_eq!(
253+
collected_metadata.get("domain2").unwrap(),
254+
"domain2_commit1"
255+
);
256+
163257
unsafe { free_snapshot(snapshot) }
164258
unsafe { free_engine(engine) }
165259

kernel/src/actions/domain_metadata.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,15 @@
44
//! For now, this module only exposes the ability to read a single domain at once from the log. In
55
//! the future this should allow for reading all domains from the log at once.
66
7-
use std::collections::HashMap;
8-
use std::sync::{Arc, LazyLock};
9-
107
use crate::actions::get_log_domain_metadata_schema;
118
use crate::actions::visitors::DomainMetadataVisitor;
129
use crate::actions::{DomainMetadata, DOMAIN_METADATA_NAME};
1310
use crate::log_replay::ActionsBatch;
1411
use crate::log_segment::LogSegment;
1512
use crate::{DeltaResult, Engine, Expression as Expr, PredicateRef, RowVisitor as _};
13+
use delta_kernel_derive::internal_api;
14+
use std::collections::HashMap;
15+
use std::sync::{Arc, LazyLock};
1616

1717
const DOMAIN_METADATA_DOMAIN_FIELD: &str = "domain";
1818

@@ -35,6 +35,16 @@ pub(crate) fn domain_metadata_configuration(
3535
.map(|domain_metadata| domain_metadata.configuration))
3636
}
3737

38+
#[allow(unused)]
39+
#[internal_api]
40+
pub(crate) fn all_domain_metadata_configuration(
41+
log_segment: &LogSegment,
42+
engine: &dyn Engine,
43+
) -> DeltaResult<Vec<DomainMetadata>> {
44+
let domain_metadatas = scan_domain_metadatas(log_segment, None, engine)?;
45+
Ok(domain_metadatas.into_values().collect())
46+
}
47+
3848
/// Scan the entire log for all domain metadata actions but terminate early if a specific domain
3949
/// is provided. Note that this returns the latest domain metadata for each domain, accounting for
4050
/// tombstones (removed=true) - that is, removed domain metadatas will _never_ be returned.

kernel/src/actions/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1062,14 +1062,17 @@ impl DomainMetadata {
10621062
// returns true if the domain metadata is an system-controlled domain (all domains that start
10631063
// with "delta.")
10641064
#[allow(unused)]
1065+
#[internal_api]
10651066
pub(crate) fn is_internal(&self) -> bool {
10661067
self.domain.starts_with(INTERNAL_DOMAIN_PREFIX)
10671068
}
10681069

1070+
#[internal_api]
10691071
pub(crate) fn domain(&self) -> &str {
10701072
&self.domain
10711073
}
10721074

1075+
#[internal_api]
10731076
pub(crate) fn configuration(&self) -> &str {
10741077
&self.configuration
10751078
}

kernel/src/snapshot.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
use std::sync::Arc;
55

66
use crate::action_reconciliation::calculate_transaction_expiration_timestamp;
7-
use crate::actions::domain_metadata::domain_metadata_configuration;
7+
use crate::actions::domain_metadata::{
8+
all_domain_metadata_configuration, domain_metadata_configuration,
9+
};
810
use crate::actions::set_transaction::SetTransactionScanner;
911
use crate::actions::INTERNAL_DOMAIN_PREFIX;
1012
use crate::checkpoint::CheckpointWriter;
@@ -24,6 +26,7 @@ use delta_kernel_derive::internal_api;
2426
mod builder;
2527
pub use builder::SnapshotBuilder;
2628

29+
use delta_kernel::actions::DomainMetadata;
2730
use tracing::debug;
2831
use url::Url;
2932

@@ -371,6 +374,19 @@ impl Snapshot {
371374
domain_metadata_configuration(self.log_segment(), domain, engine)
372375
}
373376

377+
#[allow(unused)]
378+
#[internal_api]
379+
pub(crate) fn get_all_domain_metadata(
380+
&self,
381+
engine: &dyn Engine,
382+
) -> DeltaResult<Vec<DomainMetadata>> {
383+
let all_metadata = all_domain_metadata_configuration(self.log_segment(), engine)?;
384+
Ok(all_metadata
385+
.into_iter()
386+
.filter(|domain| !domain.is_internal())
387+
.collect())
388+
}
389+
374390
/// Get the In-Commit Timestamp (ICT) for this snapshot.
375391
///
376392
/// Returns the `inCommitTimestamp` from the CommitInfo action of the commit that created this snapshot.
@@ -443,6 +459,7 @@ mod tests {
443459
use crate::parquet::arrow::ArrowWriter;
444460
use crate::path::ParsedLogPath;
445461
use crate::utils::test_utils::{assert_result_error_with_message, string_array_to_engine_data};
462+
use delta_kernel::actions::DomainMetadata;
446463

447464
/// Helper function to create a commitInfo action with optional ICT
448465
fn create_commit_info(timestamp: i64, ict: Option<i64>) -> serde_json::Value {
@@ -1152,6 +1169,8 @@ mod tests {
11521169

11531170
let snapshot = Snapshot::builder_for(url.clone()).build(&engine)?;
11541171

1172+
// Test get_domain_metadata
1173+
11551174
assert_eq!(snapshot.get_domain_metadata("domain1", &engine)?, None);
11561175
assert_eq!(
11571176
snapshot.get_domain_metadata("domain2", &engine)?,
@@ -1166,6 +1185,19 @@ mod tests {
11661185
.unwrap_err();
11671186
assert!(matches!(err, Error::Generic(msg) if
11681187
msg == "User DomainMetadata are not allowed to use system-controlled 'delta.*' domain"));
1188+
1189+
// Test get_all_domain_metadata
1190+
let mut metadata = snapshot.get_all_domain_metadata(&engine)?;
1191+
metadata.sort_by(|a, b| a.domain().cmp(b.domain()));
1192+
1193+
let mut expected = vec![
1194+
DomainMetadata::new("domain2".to_string(), "domain2_commit1".to_string()),
1195+
DomainMetadata::new("domain3".to_string(), "domain3_commit0".to_string()),
1196+
];
1197+
expected.sort_by(|a, b| a.domain().cmp(b.domain()));
1198+
1199+
assert_eq!(metadata, expected);
1200+
11691201
Ok(())
11701202
}
11711203

0 commit comments

Comments
 (0)