From d971c50e47414d1ef88b1f94e06c86f7a59531aa Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Fri, 21 Nov 2025 08:21:56 -0800 Subject: [PATCH 1/2] StagedCommitter --- kernel/src/committer.rs | 47 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/kernel/src/committer.rs b/kernel/src/committer.rs index 58748dbebe..48ee69fa78 100644 --- a/kernel/src/committer.rs +++ b/kernel/src/committer.rs @@ -168,6 +168,53 @@ impl Committer for FileSystemCommitter { } } +/// Marker trait to pass arbitrary context to the StagedCommitter. +pub trait Context: std::fmt::Debug + AsAny {} + +#[derive(Debug)] +pub struct StagedCommitter { + catalog_committer: Box, + context: Box, +} + +impl StagedCommitter { + pub fn new(catalog_committer: Box, context: Box) -> Self { + Self { + catalog_committer, + context, + } + } +} + +pub trait CatalogCommitter: Send + AsAny + std::fmt::Debug { + fn commit_request( + &self, + engine: &dyn Engine, + staged_commit_path: &Url, + context: &dyn Context, + ) -> DeltaResult; +} + +impl Committer for StagedCommitter { + fn commit( + &self, + engine: &dyn Engine, + actions: Box> + Send + '_>, + commit_metadata: CommitMetadata, + ) -> DeltaResult { + let staged_commit_path = commit_metadata.staged_commit_path()?; + engine + .json_handler() + .write_json_file(&staged_commit_path, Box::new(actions), false)?; + + let committed = engine.storage_handler().head(&staged_commit_path)?; + tracing::debug!("wrote staged commit file: {:?}", committed); + + self.catalog_committer + .commit_request(engine, &staged_commit_path, self.context.as_ref()) + } +} + #[cfg(test)] mod tests { use super::*; From 2cb34ba2602c03a40da0c22d4bc4efa447c2e609 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Tue, 2 Dec 2025 11:14:51 -0800 Subject: [PATCH 2/2] StagedCommitter FFI --- ffi/src/committer.rs | 115 +++++++++++++++++++++++++++++++++++++ ffi/src/lib.rs | 1 + ffi/src/transaction/mod.rs | 30 +++++++++- 3 files changed, 145 insertions(+), 1 deletion(-) create mode 100644 ffi/src/committer.rs diff --git a/ffi/src/committer.rs b/ffi/src/committer.rs new file mode 100644 index 0000000000..fc42a34d82 --- /dev/null +++ b/ffi/src/committer.rs @@ -0,0 +1,115 @@ +//! FFI for Committers + +use std::ffi::c_void; + +use delta_kernel::committer::{ + CatalogCommitter, CommitResponse, Committer, Context, StagedCommitter, +}; +use delta_kernel::{DeltaResult, Engine, Error}; +use url::Url; + +use crate::error::ExternResult; +use crate::handle::Handle; +use crate::{KernelStringSlice, SharedExternEngine}; +use delta_kernel_ffi_macros::handle_descriptor; + +/// This is an opaque pointer to external context. This allows engines to store additional metadata +/// to 'pass through' to its [`CatalogCommitCallback`]. +pub type ExternContextPtr = *mut c_void; + +/// FFI callback for catalog commit operations +pub type CatalogCommitCallback = extern "C" fn( + engine: Handle, + staged_commit_path: KernelStringSlice, + context: ExternContextPtr, +) -> ExternResult; + +/// Handle for a mutable boxed committer that can be passed across FFI +#[handle_descriptor(target = dyn Committer, mutable = true)] +pub struct MutableCommitter; + +/// Wrapper for external context - just holds an opaque pointer +#[derive(Debug)] +struct ExternContext { + ptr: ExternContextPtr, +} + +// SAFETY: External code is responsible for ensuring thread safety +unsafe impl Send for ExternContext {} +unsafe impl Sync for ExternContext {} + +impl Context for ExternContext {} + +/// Wrapper for external catalog committer +struct ExternCatalogCommitter { + callback: CatalogCommitCallback, + engine_handle: Handle, +} + +// SAFETY: Callback is extern "C" fn which is Send+Sync +unsafe impl Send for ExternCatalogCommitter {} +unsafe impl Sync for ExternCatalogCommitter {} + +impl std::fmt::Debug for ExternCatalogCommitter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ExternCatalogCommitter").finish() + } +} + +impl CatalogCommitter for ExternCatalogCommitter { + fn commit_request( + &self, + _engine: &dyn Engine, + staged_commit_path: &Url, + context: &dyn Context, + ) -> DeltaResult { + let extern_context = context + .any_ref() + .downcast_ref::() + .ok_or_else(|| Error::generic("Invalid context type for external committer"))?; + + let path_str = staged_commit_path.as_str(); + let path_slice = unsafe { KernelStringSlice::new_unsafe(path_str) }; + + let engine_handle = unsafe { self.engine_handle.clone_handle() }; + + // call the callback and convert result + match (self.callback)(engine_handle, path_slice, extern_context.ptr) { + ExternResult::Ok(response) => Ok(response), + ExternResult::Err(_) => Err(Error::generic("External catalog commit callback failed")), + } + } +} + +/// Create a staged committer with external catalog implementation +/// +/// # Safety +/// - `callback` must be a valid function pointer +/// - `context` must remain valid for the lifetime of the committer +/// - `engine` must be a valid handle +#[no_mangle] +pub unsafe extern "C" fn create_staged_committer( + callback: CatalogCommitCallback, + context: ExternContextPtr, + engine: Handle, +) -> Handle { + // just double-boxing the context + let extern_context = Box::new(ExternContext { ptr: context }); + let extern_committer = Box::new(ExternCatalogCommitter { + callback, + engine_handle: engine, + }); + + let staged_committer = StagedCommitter::new(extern_committer, extern_context); + let staged_committer: Box = Box::new(staged_committer); + staged_committer.into() +} + +/// Free a committer handle +/// +/// # Safety +/// Caller must pass a valid handle +#[no_mangle] +pub unsafe extern "C" fn free_committer(committer: Handle) { + committer.drop_handle(); +} diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs index b300e59f5e..06bb59df87 100644 --- a/ffi/src/lib.rs +++ b/ffi/src/lib.rs @@ -52,6 +52,7 @@ mod ffi_test_utils; #[cfg(feature = "test-ffi")] pub mod test_ffi; pub mod transaction; +pub mod committer; pub(crate) type NullableCvoid = Option>; diff --git a/ffi/src/transaction/mod.rs b/ffi/src/transaction/mod.rs index 80b0f9c8bd..0ee32ed619 100644 --- a/ffi/src/transaction/mod.rs +++ b/ffi/src/transaction/mod.rs @@ -8,7 +8,7 @@ use crate::KernelStringSlice; use crate::{unwrap_and_parse_path_as_url, TryFromStringSlice}; use crate::{DeltaResult, ExternEngine, Snapshot, Url}; use crate::{ExclusiveEngineData, SharedExternEngine}; -use delta_kernel::committer::FileSystemCommitter; +use delta_kernel::committer::{Committer, FileSystemCommitter}; use delta_kernel::transaction::{CommitResult, Transaction}; use delta_kernel_ffi_macros::handle_descriptor; @@ -45,6 +45,34 @@ fn transaction_impl( Ok(Box::new(transaction?).into()) } +/// Start a transaction with a custom committer +/// NOTE: This consumes the committer handle +/// +/// # Safety +/// +/// Caller is responsible for passing valid handles +#[no_mangle] +pub unsafe extern "C" fn transaction_with_committer( + path: KernelStringSlice, + engine: Handle, + committer: Handle, +) -> ExternResult> { + let url = unsafe { unwrap_and_parse_path_as_url(path) }; + let engine = unsafe { engine.as_ref() }; + let committer = unsafe { committer.into_inner() }; + transaction_with_committer_impl(url, engine, committer).into_extern_result(&engine) +} + +fn transaction_with_committer_impl( + url: DeltaResult, + extern_engine: &dyn ExternEngine, + committer: Box, +) -> DeltaResult> { + let snapshot = Snapshot::builder_for(url?).build(extern_engine.engine().as_ref())?; + let transaction = snapshot.transaction(committer); + Ok(Box::new(transaction?).into()) +} + /// # Safety /// /// Caller is responsible for passing a valid handle.