diff --git a/kernel/src/engine/arrow_utils.rs b/kernel/src/engine/arrow_utils.rs index 21cb246f37..1a126842ca 100644 --- a/kernel/src/engine/arrow_utils.rs +++ b/kernel/src/engine/arrow_utils.rs @@ -18,7 +18,7 @@ use crate::{ use crate::arrow::array::{ cast::AsArray, make_array, new_null_array, Array as ArrowArray, BooleanArray, GenericListArray, - MapArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, StringArray, StructArray, + MapArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, RunArray, StringArray, StructArray, }; use crate::arrow::buffer::NullBuffer; use crate::arrow::compute::concat_batches; @@ -168,15 +168,17 @@ impl RowIndexBuilder { /// ensure schema compatibility, as well as `fix_nested_null_masks` to ensure that leaf columns have /// accurate null masks that row visitors rely on for correctness. /// `row_indexes` are passed through to `reorder_struct_array`. +/// `file_location` is used to populate file metadata columns if requested. pub(crate) fn fixup_parquet_read( batch: RecordBatch, requested_ordering: &[ReorderIndex], row_indexes: Option<&mut FlattenedRangeIterator>, + file_location: Option<&str>, ) -> DeltaResult where StructArray: Into, { - let data = reorder_struct_array(batch.into(), requested_ordering, row_indexes)?; + let data = reorder_struct_array(batch.into(), requested_ordering, row_indexes, file_location)?; let data = fix_nested_null_masks(data); Ok(data.into()) } @@ -306,6 +308,8 @@ pub(crate) enum ReorderIndexTransform { Missing(ArrowFieldRef), /// Row index column requested, compute it RowIndex(ArrowFieldRef), + /// File path column requested, populate with file path + FilePath(ArrowFieldRef), } impl ReorderIndex { @@ -333,14 +337,19 @@ impl ReorderIndex { ReorderIndex::new(index, ReorderIndexTransform::RowIndex(field)) } + fn file_path(index: usize, field: ArrowFieldRef) -> Self { + ReorderIndex::new(index, ReorderIndexTransform::FilePath(field)) + } + /// Check if this reordering requires a transformation anywhere. See comment below on /// [`ordering_needs_transform`] to understand why this is needed. fn needs_transform(&self) -> bool { match self.transform { - // if we're casting, inserting null, or generating row index, we need to transform + // if we're casting, inserting null, or generating row index/file path, we need to transform ReorderIndexTransform::Cast(_) | ReorderIndexTransform::Missing(_) - | ReorderIndexTransform::RowIndex(_) => true, + | ReorderIndexTransform::RowIndex(_) + | ReorderIndexTransform::FilePath(_) => true, // if our nested ordering needs a transform, we need a transform ReorderIndexTransform::Nested(ref children) => ordering_needs_transform(children), // no transform needed @@ -595,6 +604,13 @@ fn get_indices( Arc::new(field.try_into_arrow()?), )); } + Some(MetadataColumnSpec::FilePath) => { + debug!("Inserting a file path column: {}", field.name()); + reorder_indices.push(ReorderIndex::file_path( + requested_position, + Arc::new(field.try_into_arrow()?), + )); + } Some(metadata_spec) => { return Err(Error::Generic(format!( "Metadata column {metadata_spec:?} is not supported by the default parquet reader" @@ -765,10 +781,13 @@ type FieldArrayOpt = Option<(Arc, Arc)>; /// /// If the requested ordering contains a [`ReorderIndexTransform::RowIndex`], `row_indexes` /// must not be `None` to append a row index column to the output. +/// If the requested ordering contains a [`ReorderIndexTransform::FilePath`], `file_location` +/// must not be `None` to append a file path column to the output. pub(crate) fn reorder_struct_array( input_data: StructArray, requested_ordering: &[ReorderIndex], mut row_indexes: Option<&mut FlattenedRangeIterator>, + file_location: Option<&str>, ) -> DeltaResult { debug!("Reordering {input_data:?} with ordering: {requested_ordering:?}"); if !ordering_needs_transform(requested_ordering) { @@ -806,6 +825,7 @@ pub(crate) fn reorder_struct_array( struct_array, children, None, // Nested structures don't need row indexes since metadata columns can't be nested + None, // No file_location passed since metadata columns can't be nested )?); // create the new field specifying the correct order for the struct let new_field = Arc::new(ArrowField::new_struct( @@ -866,6 +886,27 @@ pub(crate) fn reorder_struct_array( final_fields_cols[reorder_index.index] = Some((Arc::clone(field), Arc::new(row_index_array))); } + ReorderIndexTransform::FilePath(field) => { + let Some(file_path) = file_location else { + return Err(Error::generic( + "File path column requested but file location not provided", + )); + }; + // Use run-end encoding for efficiency since the file path is constant for all rows + // Run-end encoding stores: [run_ends: [num_rows], values: [file_path]] + let run_ends = PrimitiveArray::::from_iter_values([num_rows as i64]); + let values = StringArray::from_iter_values([file_path]); + let file_path_array = RunArray::try_new(&run_ends, &values)?; + + // Create a field with the RunEndEncoded data type to match the array + let ree_field = Arc::new(ArrowField::new( + field.name(), + file_path_array.data_type().clone(), + field.is_nullable(), + )); + final_fields_cols[reorder_index.index] = + Some((ree_field, Arc::new(file_path_array))); + } } } let num_cols = final_fields_cols.len(); @@ -895,6 +936,7 @@ fn reorder_list( struct_array, children, None, // Nested structures don't need row indexes since metadata columns can't be nested + None, // No file_location passed since metadata columns can't be nested )?); let new_list_field = Arc::new(ArrowField::new_struct( list_field.name(), @@ -930,6 +972,7 @@ fn reorder_map( struct_array, children, None, // Nested structures don't need row indexes since metadata columns can't be nested + None, // No file_location passed since metadata columns can't be nested )?; let result_fields = result_array.fields(); let new_map_field = Arc::new(ArrowField::new_struct( @@ -1793,7 +1836,7 @@ mod tests { ), ]; - let result = reorder_struct_array(arry, &reorder, None); + let result = reorder_struct_array(arry, &reorder, None, None); assert_result_error_with_message( result, "Row index column requested but row index iterator not provided", @@ -1816,7 +1859,7 @@ mod tests { #[allow(clippy::single_range_in_vec_init)] let mut row_indexes = vec![(0..4)].into_iter().flatten(); - let ordered = reorder_struct_array(arry, &reorder, Some(&mut row_indexes)).unwrap(); + let ordered = reorder_struct_array(arry, &reorder, Some(&mut row_indexes), None).unwrap(); assert_eq!(ordered.column_names(), vec!["b", "row_idx"]); // Verify the row index column contains the expected values @@ -1853,6 +1896,92 @@ mod tests { assert_eq!(reorder_indices, expect_reorder); } + #[test] + fn simple_file_path_field() { + let requested_schema = Arc::new(StructType::new_unchecked([ + StructField::not_null("i", DataType::INTEGER), + StructField::create_metadata_column("_file", MetadataColumnSpec::FilePath), + StructField::nullable("i2", DataType::INTEGER), + ])); + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("i", ArrowDataType::Int32, false), + ArrowField::new("i2", ArrowDataType::Int32, true), + ])); + let (mask_indices, reorder_indices) = + get_requested_indices(&requested_schema, &parquet_schema).unwrap(); + let expect_mask = vec![0, 1]; + let mut arrow_file_path_field = ArrowField::new("_file", ArrowDataType::Utf8, false); + arrow_file_path_field.set_metadata(HashMap::from([( + "delta.metadataSpec".to_string(), + "_file".to_string(), + )])); + let expect_reorder = vec![ + ReorderIndex::identity(0), + ReorderIndex::identity(2), + ReorderIndex::file_path(1, Arc::new(arrow_file_path_field)), + ]; + assert_eq!(mask_indices, expect_mask); + assert_eq!(reorder_indices, expect_reorder); + } + + #[test] + fn test_reorder_struct_array_with_file_path() { + // Test that file paths work when properly provided + let arry = make_struct_array(); + let reorder = vec![ + ReorderIndex::identity(0), + ReorderIndex::file_path( + 1, + Arc::new(ArrowField::new("_file", ArrowDataType::Utf8, false)), + ), + ]; + + let file_location = "s3://bucket/path/to/file.parquet"; + let ordered = reorder_struct_array(arry, &reorder, None, Some(file_location)).unwrap(); + assert_eq!(ordered.column_names(), vec!["b", "_file"]); + + // Verify the file path column is run-end encoded and contains the expected value + let file_path_col = ordered.column(1); + + // Check it's a RunArray + let run_array = file_path_col + .as_any() + .downcast_ref::>() + .expect("Expected RunArray"); + + // Verify it has 4 logical rows (same as input) + assert_eq!(run_array.len(), 4); + + // Verify the physical representation is efficient: 1 run with value at end position 4 + let run_ends = run_array.run_ends().values(); + assert_eq!(run_ends.len(), 1, "Should have only 1 run"); + assert_eq!(run_ends[0], 4, "Run should end at position 4"); + + // Verify the value + let values = run_array.values().as_string::(); + assert_eq!(values.len(), 1, "Should have only 1 unique value"); + assert_eq!(values.value(0), file_location); + } + + #[test] + fn test_reorder_struct_array_missing_file_path() { + // Test that error occurs when file path is requested but not provided + let arry = make_struct_array(); + let reorder = vec![ + ReorderIndex::identity(0), + ReorderIndex::file_path( + 1, + Arc::new(ArrowField::new("_file", ArrowDataType::Utf8, false)), + ), + ]; + + let result = reorder_struct_array(arry, &reorder, None, None); + assert_result_error_with_message( + result, + "File path column requested but file location not provided", + ); + } + #[test] fn test_row_index_builder_no_skipping() { let row_groups = vec![ @@ -2530,7 +2659,7 @@ mod tests { fn simple_reorder_struct() { let arry = make_struct_array(); let reorder = vec![ReorderIndex::identity(1), ReorderIndex::identity(0)]; - let ordered = reorder_struct_array(arry, &reorder, None).unwrap(); + let ordered = reorder_struct_array(arry, &reorder, None, None).unwrap(); assert_eq!(ordered.column_names(), vec!["c", "b"]); } @@ -2578,7 +2707,7 @@ mod tests { ], ), ]; - let ordered = reorder_struct_array(nested, &reorder, None).unwrap(); + let ordered = reorder_struct_array(nested, &reorder, None, None).unwrap(); assert_eq!(ordered.column_names(), vec!["struct2", "struct1"]); let ordered_s2 = ordered.column(0).as_struct(); assert_eq!(ordered_s2.column_names(), vec!["b", "c", "s"]); @@ -2625,7 +2754,7 @@ mod tests { 0, vec![ReorderIndex::identity(1), ReorderIndex::identity(0)], )]; - let ordered = reorder_struct_array(struct_array, &reorder, None).unwrap(); + let ordered = reorder_struct_array(struct_array, &reorder, None, None).unwrap(); let ordered_list_col = ordered.column(0).as_list::(); for i in 0..ordered_list_col.len() { let array_item = ordered_list_col.value(i); @@ -2691,7 +2820,7 @@ mod tests { ], ), ]; - let ordered = reorder_struct_array(struct_array, &reorder, None).unwrap(); + let ordered = reorder_struct_array(struct_array, &reorder, None, None).unwrap(); assert_eq!(ordered.column_names(), vec!["map", "i"]); if let ArrowDataType::Map(field, _) = ordered.column(0).data_type() { if let ArrowDataType::Struct(fields) = field.data_type() { diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index d540b9124f..1635f663ad 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -278,6 +278,7 @@ impl FileOpener for ParquetOpener { fn open(&self, file_meta: FileMeta, _range: Option>) -> DeltaResult { let path = Path::from_url_path(file_meta.location.path())?; let store = self.store.clone(); + let file_location = file_meta.location.to_string(); let batch_size = self.batch_size; // let projection = self.projection.clone(); @@ -341,7 +342,12 @@ impl FileOpener for ParquetOpener { let stream = builder.with_batch_size(batch_size).build()?; let stream = stream.map(move |rbr| { - fixup_parquet_read(rbr?, &requested_ordering, row_indexes.as_mut()) + fixup_parquet_read( + rbr?, + &requested_ordering, + row_indexes.as_mut(), + Some(&file_location), + ) }); Ok(stream.boxed()) })) @@ -380,10 +386,11 @@ impl FileOpener for PresignedUrlOpener { let predicate = self.predicate.clone(); let limit = self.limit; let client = self.client.clone(); // uses Arc internally according to reqwest docs + let file_location = file_meta.location.to_string(); Ok(Box::pin(async move { // fetch the file from the interweb - let reader = client.get(file_meta.location).send().await?.bytes().await?; + let reader = client.get(&file_location).send().await?.bytes().await?; let metadata = ArrowReaderMetadata::load(&reader, Default::default())?; let parquet_schema = metadata.schema(); let (indices, requested_ordering) = @@ -418,7 +425,12 @@ impl FileOpener for PresignedUrlOpener { let mut row_indexes = row_indexes.map(|rb| rb.build()).transpose()?; let stream = futures::stream::iter(reader); let stream = stream.map(move |rbr| { - fixup_parquet_read(rbr?, &requested_ordering, row_indexes.as_mut()) + fixup_parquet_read( + rbr?, + &requested_ordering, + row_indexes.as_mut(), + Some(&file_location), + ) }); Ok(stream.boxed()) })) diff --git a/kernel/src/engine/sync/json.rs b/kernel/src/engine/sync/json.rs index f6a50f9689..ea2855ba20 100644 --- a/kernel/src/engine/sync/json.rs +++ b/kernel/src/engine/sync/json.rs @@ -17,11 +17,15 @@ use crate::{ pub(crate) struct SyncJsonHandler; +/// Note: This function must match the signature expected by `read_files` helper function, +/// which is also used by `try_create_from_parquet`. The `_file_location` parameter is unused +/// here but required to satisfy the shared function signature. fn try_create_from_json( file: File, _schema: SchemaRef, arrow_schema: ArrowSchemaRef, _predicate: Option, + _file_location: String, ) -> DeltaResult>> { let json = ReaderBuilder::new(arrow_schema) .build(BufReader::new(file))? diff --git a/kernel/src/engine/sync/mod.rs b/kernel/src/engine/sync/mod.rs index 10e3cc6b14..b236aa63b8 100644 --- a/kernel/src/engine/sync/mod.rs +++ b/kernel/src/engine/sync/mod.rs @@ -65,7 +65,7 @@ fn read_files( ) -> DeltaResult where I: Iterator> + Send + 'static, - F: FnMut(File, SchemaRef, ArrowSchemaRef, Option) -> DeltaResult + F: FnMut(File, SchemaRef, ArrowSchemaRef, Option, String) -> DeltaResult + Send + 'static, { @@ -79,6 +79,7 @@ where .into_iter() // Produces Iterator>>> .map(move |file| { + let location_string = file.location.to_string(); let location = file.location; debug!("Reading {location:#?} with schema {schema:#?} and predicate {predicate:#?}"); let path = location @@ -89,6 +90,7 @@ where schema.clone(), arrow_schema.clone(), predicate.clone(), + location_string, ) }) // Flatten to Iterator>> diff --git a/kernel/src/engine/sync/parquet.rs b/kernel/src/engine/sync/parquet.rs index d170a23343..bf937ed676 100644 --- a/kernel/src/engine/sync/parquet.rs +++ b/kernel/src/engine/sync/parquet.rs @@ -20,6 +20,7 @@ fn try_create_from_parquet( schema: SchemaRef, _arrow_schema: ArrowSchemaRef, predicate: Option, + file_location: String, ) -> DeltaResult>> { let metadata = ArrowReaderMetadata::load(&file, Default::default())?; let parquet_schema = metadata.schema(); @@ -40,7 +41,14 @@ fn try_create_from_parquet( let mut row_indexes = row_indexes.map(|rb| rb.build()).transpose()?; let stream = builder.build()?; - Ok(stream.map(move |rbr| fixup_parquet_read(rbr?, &requested_ordering, row_indexes.as_mut()))) + Ok(stream.map(move |rbr| { + fixup_parquet_read( + rbr?, + &requested_ordering, + row_indexes.as_mut(), + Some(&file_location), + ) + })) } impl ParquetHandler for SyncParquetHandler { diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 0f611cfbcc..624e51064f 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -617,6 +617,16 @@ pub trait JsonHandler: AsAny { ) -> DeltaResult<()>; } +/// Reserved field IDs for metadata columns in Delta tables. +/// +/// These field IDs are reserved and should not be used for regular table columns. +/// They are used to provide file-level metadata as virtual columns during reads. +pub mod reserved_field_ids { + /// Reserved field ID for the file name metadata column (`_file`). + /// This column provides the name of the Parquet file that contains each row. + pub const FILE_NAME: i64 = 2147483646; +} + /// Provides Parquet file related functionalities to Delta Kernel. /// /// Connectors can leverage this trait to provide their own custom @@ -640,11 +650,65 @@ pub trait ParquetHandler: AsAny { /// 2. **Field Name**: If no field ID is present in the `physical_schema`'s [`StructField`] or no matching parquet field ID is found, /// fall back to matching by column name /// + /// # Metadata Columns + /// + /// The ParquetHandler must support virtual metadata columns that provide additional information + /// about each row. These columns are not stored in the Parquet file but are generated at read time. + /// + /// ## Row Index Column + /// + /// When a column in `physical_schema` is marked as a row index metadata column (via + /// [`StructField::create_metadata_column`] with [`schema::MetadataColumnSpec::RowIndex`]), the + /// ParquetHandler must populate it with the 0-based row position within the Parquet file: + /// + /// - **Column name**: User-specified (commonly `"row_index"` or `"_metadata.row_index"`) + /// - **Type**: `LONG` (non-nullable) + /// - **Values**: Sequential integers starting at 0 for each file + /// - **Use case**: Track row positions for downstream processing, or internally used to compute Row IDs + /// + /// Example: A file with 5 rows would have row_index values `[0, 1, 2, 3, 4]`. + /// + /// ## File Name Column (Reserved Field ID) + /// + /// When a column in `physical_schema` has the reserved field ID + /// [`reserved_field_ids::FILE_NAME`] (2147483646), the ParquetHandler must populate it + /// with the file path/name: + /// + /// - **Column name**: `"_file"` + /// - **Type**: `STRING` (non-nullable) + /// - **Field ID**: 2147483646 (reserved) + /// - **Values**: The file path/URL (e.g., `"s3://bucket/path/file.parquet"`) + /// - **Use case**: Track which file each row came from in multi-file reads + /// + /// Example: All rows from the same file would have the same `_file` value. + /// + /// ## Metadata Column Examples + /// + /// ```rust,ignore + /// use delta_kernel::schema::{StructType, StructField, DataType, MetadataColumnSpec}; + /// + /// // Example 1: Schema with row_index metadata column + /// let schema_with_row_index = StructType::try_new([ + /// StructField::nullable("id", DataType::INTEGER), + /// StructField::create_metadata_column("row_index", MetadataColumnSpec::RowIndex), + /// StructField::nullable("value", DataType::STRING), + /// ])?; + /// + /// // Example 2: Schema with _file metadata column (using reserved field ID) + /// let schema_with_file_path = StructType::try_new([ + /// StructField::nullable("id", DataType::INTEGER), + /// StructField::create_metadata_column("_file", MetadataColumnSpec::FilePath), + /// StructField::nullable("value", DataType::STRING), + /// ])?; + /// ``` + /// + /// --- + /// /// If no matching Parquet column is found, `NULL` values are returned /// for nullable columns in `physical_schema`. For non-nullable columns, an error is returned. /// /// - /// ## Examples + /// ## Column Matching Examples /// /// Consider a `physical_schema` with the following fields: /// - Column 0: `"i_logical"` (integer, non-null) with metadata `"parquet.field.id": 1` diff --git a/kernel/src/scan/state_info.rs b/kernel/src/scan/state_info.rs index 44abf33e54..d3af4341dc 100644 --- a/kernel/src/scan/state_info.rs +++ b/kernel/src/scan/state_info.rs @@ -81,6 +81,9 @@ fn validate_metadata_columns<'a>( metadata_info.materialized_row_id_column_name = Some(row_id_col); } Some(MetadataColumnSpec::RowCommitVersion) => {} + Some(MetadataColumnSpec::FilePath) => { + // FilePath metadata column is handled by the parquet reader + } None => {} } metadata_info @@ -169,9 +172,11 @@ impl StateInfo { Some(MetadataColumnSpec::RowCommitVersion) => { return Err(Error::unsupported("Row commit versions not supported")); } - Some(MetadataColumnSpec::RowIndex) | None => { - // note that RowIndex is handled in the parquet reader so we just add it as - // if it's a normal physical column + Some(MetadataColumnSpec::RowIndex) + | Some(MetadataColumnSpec::FilePath) + | None => { + // note that RowIndex and FilePath are handled in the parquet reader so we just add them as + // if they're normal physical columns let physical_field = logical_field.make_physical(column_mapping_mode); debug!("\n\n{logical_field:#?}\nAfter mapping: {physical_field:#?}\n\n"); let physical_name = physical_field.name.clone(); diff --git a/kernel/src/schema/mod.rs b/kernel/src/schema/mod.rs index 1715d25c8b..911075f0ca 100644 --- a/kernel/src/schema/mod.rs +++ b/kernel/src/schema/mod.rs @@ -132,6 +132,7 @@ pub enum MetadataColumnSpec { RowIndex, RowId, RowCommitVersion, + FilePath, } impl MetadataColumnSpec { @@ -141,6 +142,7 @@ impl MetadataColumnSpec { Self::RowIndex => "row_index", Self::RowId => "row_id", Self::RowCommitVersion => "row_commit_version", + Self::FilePath => "_file", } } @@ -150,6 +152,7 @@ impl MetadataColumnSpec { Self::RowIndex => DataType::LONG, Self::RowId => DataType::LONG, Self::RowCommitVersion => DataType::LONG, + Self::FilePath => DataType::STRING, } } @@ -159,6 +162,15 @@ impl MetadataColumnSpec { Self::RowIndex => false, Self::RowId => false, Self::RowCommitVersion => false, + Self::FilePath => false, + } + } + + /// The reserved field ID for the specified metadata column, if any. + pub fn reserved_field_id(&self) -> Option { + match self { + Self::FilePath => Some(crate::reserved_field_ids::FILE_NAME), + _ => None, } } } @@ -171,6 +183,7 @@ impl FromStr for MetadataColumnSpec { "row_index" => Ok(Self::RowIndex), "row_id" => Ok(Self::RowId), "row_commit_version" => Ok(Self::RowCommitVersion), + "_file" => Ok(Self::FilePath), _ => Err(Error::Schema(format!("Unknown metadata column spec: {s}"))), } }