Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions ffi/src/committer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
//! FFI for Committers

use std::ffi::c_void;

use delta_kernel::committer::{
CatalogCommitter, CommitResponse, Committer, Context, StagedCommitter,
};
use delta_kernel::{DeltaResult, Engine, Error};
use url::Url;

use crate::error::ExternResult;
use crate::handle::Handle;
use crate::{KernelStringSlice, SharedExternEngine};
use delta_kernel_ffi_macros::handle_descriptor;

/// This is an opaque pointer to external context. This allows engines to store additional metadata
/// to 'pass through' to its [`CatalogCommitCallback`].
pub type ExternContextPtr = *mut c_void;

/// FFI callback for catalog commit operations
pub type CatalogCommitCallback = extern "C" fn(

Check failure on line 21 in ffi/src/committer.rs

View workflow job for this annotation

GitHub Actions / build (macOS-latest)

`extern` fn uses type `delta_kernel::committer::CommitResponse`, which is not FFI-safe

Check failure on line 21 in ffi/src/committer.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

`extern` fn uses type `delta_kernel::committer::CommitResponse`, which is not FFI-safe

Check failure on line 21 in ffi/src/committer.rs

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest)

`extern` fn uses type `CommitResponse`, which is not FFI-safe

Check failure on line 21 in ffi/src/committer.rs

View workflow job for this annotation

GitHub Actions / test (windows-latest)

`extern` fn uses type `CommitResponse`, which is not FFI-safe

Check failure on line 21 in ffi/src/committer.rs

View workflow job for this annotation

GitHub Actions / coverage

`extern` fn uses type `CommitResponse`, which is not FFI-safe
engine: Handle<SharedExternEngine>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to pass the engine here?

staged_commit_path: KernelStringSlice,
context: ExternContextPtr,
) -> ExternResult<CommitResponse>;

/// Handle for a mutable boxed committer that can be passed across FFI
#[handle_descriptor(target = dyn Committer, mutable = true)]
pub struct MutableCommitter;

/// Wrapper for external context - just holds an opaque pointer
#[derive(Debug)]
struct ExternContext {
ptr: ExternContextPtr,
}

// SAFETY: External code is responsible for ensuring thread safety
unsafe impl Send for ExternContext {}
unsafe impl Sync for ExternContext {}

impl Context for ExternContext {}

/// Wrapper for external catalog committer
struct ExternCatalogCommitter {
callback: CatalogCommitCallback,
engine_handle: Handle<SharedExternEngine>,
}

// SAFETY: Callback is extern "C" fn which is Send+Sync
unsafe impl Send for ExternCatalogCommitter {}
unsafe impl Sync for ExternCatalogCommitter {}

impl std::fmt::Debug for ExternCatalogCommitter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ExternCatalogCommitter").finish()
}
}

impl CatalogCommitter for ExternCatalogCommitter {
fn commit_request(
&self,
_engine: &dyn Engine,
staged_commit_path: &Url,
context: &dyn Context,
) -> DeltaResult<CommitResponse> {
let extern_context = context
.any_ref()
.downcast_ref::<ExternContext>()
.ok_or_else(|| Error::generic("Invalid context type for external committer"))?;

let path_str = staged_commit_path.as_str();
let path_slice = unsafe { KernelStringSlice::new_unsafe(path_str) };

let engine_handle = unsafe { self.engine_handle.clone_handle() };
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd need to go look, but i think clone_handle() bumps the refcount, so you need to make it clear that the engine needs to free_engine after this call.

I'd prefer to no need to pass an engine at all.


// call the callback and convert result
match (self.callback)(engine_handle, path_slice, extern_context.ptr) {
ExternResult::Ok(response) => Ok(response),
ExternResult::Err(_) => Err(Error::generic("External catalog commit callback failed")),
}
}
}

/// Create a staged committer with external catalog implementation
///
/// # Safety
/// - `callback` must be a valid function pointer
/// - `context` must remain valid for the lifetime of the committer
/// - `engine` must be a valid handle
#[no_mangle]
pub unsafe extern "C" fn create_staged_committer(
callback: CatalogCommitCallback,

Check failure on line 92 in ffi/src/committer.rs

View workflow job for this annotation

GitHub Actions / build (macOS-latest)

`extern` fn uses type `delta_kernel::committer::CommitResponse`, which is not FFI-safe

Check failure on line 92 in ffi/src/committer.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

`extern` fn uses type `delta_kernel::committer::CommitResponse`, which is not FFI-safe

Check failure on line 92 in ffi/src/committer.rs

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest)

`extern` fn uses type `CommitResponse`, which is not FFI-safe

Check failure on line 92 in ffi/src/committer.rs

View workflow job for this annotation

GitHub Actions / test (windows-latest)

`extern` fn uses type `CommitResponse`, which is not FFI-safe

Check failure on line 92 in ffi/src/committer.rs

View workflow job for this annotation

GitHub Actions / coverage

`extern` fn uses type `CommitResponse`, which is not FFI-safe
context: ExternContextPtr,
engine: Handle<SharedExternEngine>,
) -> Handle<MutableCommitter> {
// just double-boxing the context
let extern_context = Box::new(ExternContext { ptr: context });
let extern_committer = Box::new(ExternCatalogCommitter {
callback,
engine_handle: engine,
});

let staged_committer = StagedCommitter::new(extern_committer, extern_context);
let staged_committer: Box<dyn Committer> = Box::new(staged_committer);
staged_committer.into()
}

/// Free a committer handle
///
/// # Safety
/// Caller must pass a valid handle
#[no_mangle]
pub unsafe extern "C" fn free_committer(committer: Handle<MutableCommitter>) {
committer.drop_handle();
}
1 change: 1 addition & 0 deletions ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,15 @@
pub mod ffi_tracing;
#[cfg(feature = "catalog-managed")]
pub mod log_path;
pub mod scan;

Check warning on line 47 in ffi/src/lib.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/delta-kernel-rs/delta-kernel-rs/ffi/src/lib.rs
pub mod schema;

#[cfg(test)]
mod ffi_test_utils;
#[cfg(feature = "test-ffi")]
pub mod test_ffi;

Check warning on line 53 in ffi/src/lib.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/delta-kernel-rs/delta-kernel-rs/ffi/src/lib.rs
pub mod transaction;
pub mod committer;

pub(crate) type NullableCvoid = Option<NonNull<c_void>>;

Expand Down
30 changes: 29 additions & 1 deletion ffi/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::KernelStringSlice;
use crate::{unwrap_and_parse_path_as_url, TryFromStringSlice};
use crate::{DeltaResult, ExternEngine, Snapshot, Url};
use crate::{ExclusiveEngineData, SharedExternEngine};
use delta_kernel::committer::FileSystemCommitter;
use delta_kernel::committer::{Committer, FileSystemCommitter};
use delta_kernel::transaction::{CommitResult, Transaction};
use delta_kernel_ffi_macros::handle_descriptor;

Expand Down Expand Up @@ -45,6 +45,34 @@ fn transaction_impl(
Ok(Box::new(transaction?).into())
}

/// Start a transaction with a custom committer
/// NOTE: This consumes the committer handle
///
/// # Safety
///
/// Caller is responsible for passing valid handles
#[no_mangle]
pub unsafe extern "C" fn transaction_with_committer(
path: KernelStringSlice,
engine: Handle<SharedExternEngine>,
committer: Handle<crate::committer::MutableCommitter>,
) -> ExternResult<Handle<ExclusiveTransaction>> {
let url = unsafe { unwrap_and_parse_path_as_url(path) };
let engine = unsafe { engine.as_ref() };
let committer = unsafe { committer.into_inner() };
transaction_with_committer_impl(url, engine, committer).into_extern_result(&engine)
}

fn transaction_with_committer_impl(
url: DeltaResult<Url>,
extern_engine: &dyn ExternEngine,
committer: Box<dyn Committer>,
) -> DeltaResult<Handle<ExclusiveTransaction>> {
let snapshot = Snapshot::builder_for(url?).build(extern_engine.engine().as_ref())?;
let transaction = snapshot.transaction(committer);
Ok(Box::new(transaction?).into())
}

/// # Safety
///
/// Caller is responsible for passing a valid handle.
Expand Down
47 changes: 47 additions & 0 deletions kernel/src/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,53 @@ impl Committer for FileSystemCommitter {
}
}

/// Marker trait to pass arbitrary context to the StagedCommitter.
pub trait Context: std::fmt::Debug + AsAny {}

#[derive(Debug)]
pub struct StagedCommitter {
catalog_committer: Box<dyn CatalogCommitter>,
context: Box<dyn Context>,
}

impl StagedCommitter {
pub fn new(catalog_committer: Box<dyn CatalogCommitter>, context: Box<dyn Context>) -> Self {
Self {
catalog_committer,
context,
}
}
}

pub trait CatalogCommitter: Send + AsAny + std::fmt::Debug {
fn commit_request(
&self,
engine: &dyn Engine,
staged_commit_path: &Url,
context: &dyn Context,
) -> DeltaResult<CommitResponse>;
}

impl Committer for StagedCommitter {
fn commit(
&self,
engine: &dyn Engine,
actions: Box<dyn Iterator<Item = DeltaResult<FilteredEngineData>> + Send + '_>,
commit_metadata: CommitMetadata,
) -> DeltaResult<CommitResponse> {
let staged_commit_path = commit_metadata.staged_commit_path()?;
engine
.json_handler()
.write_json_file(&staged_commit_path, Box::new(actions), false)?;

let committed = engine.storage_handler().head(&staged_commit_path)?;
tracing::debug!("wrote staged commit file: {:?}", committed);

self.catalog_committer
.commit_request(engine, &staged_commit_path, self.context.as_ref())
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading