-
Notifications
You must be signed in to change notification settings - Fork 361
feat(core): Add support for _file column
#1824
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 26 commits
aab78d6
ee21cab
37b52e2
44463a0
b5449f6
e034009
4f0a4f1
51f76d3
d84e16b
984dacd
bd478cb
8593db0
9b186c7
30ae5fb
adf0da0
f4336a8
ef3a965
534490b
04bf463
9e88edf
060b45d
8572dae
f273add
5aa92ae
c05b886
33bb0ad
42167ff
cbc6b17
977c813
83443aa
35aba12
830e462
4eb8a63
9d41b7f
0b8f15b
7ce462e
fca14bd
b7da6d3
7ebdf87
edbc72a
4a08ee6
671fd4f
4757868
d1985c3
0d674a4
38e3233
16decc0
bcae8d1
1257268
300d938
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,15 +20,17 @@ use std::sync::Arc; | |
|
|
||
| use arrow_array::{ | ||
| Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Date32Array, Float32Array, | ||
| Float64Array, Int32Array, Int64Array, NullArray, RecordBatch, RecordBatchOptions, StringArray, | ||
| Float64Array, Int32Array, Int64Array, NullArray, RecordBatch, RecordBatchOptions, RunArray, | ||
| StringArray, | ||
| }; | ||
| use arrow_cast::cast; | ||
| use arrow_schema::{ | ||
| DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SchemaRef, | ||
| DataType, Field, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SchemaRef, | ||
| }; | ||
| use parquet::arrow::PARQUET_FIELD_ID_META_KEY; | ||
|
|
||
| use crate::arrow::schema_to_arrow_schema; | ||
| use crate::metadata_columns::get_metadata_column_name; | ||
| use crate::spec::{Literal, PrimitiveLiteral, Schema as IcebergSchema}; | ||
| use crate::{Error, ErrorKind, Result}; | ||
|
|
||
|
|
@@ -111,6 +113,9 @@ enum SchemaComparison { | |
| pub(crate) struct RecordBatchTransformer { | ||
| snapshot_schema: Arc<IcebergSchema>, | ||
| projected_iceberg_field_ids: Vec<i32>, | ||
| // Pre-computed constant field information: field_id -> (arrow_type, value) | ||
| // Avoids duplicate lookups and type conversions during batch processing | ||
| constant_fields: HashMap<i32, (DataType, PrimitiveLiteral)>, | ||
|
|
||
| // BatchTransform gets lazily constructed based on the schema of | ||
| // the first RecordBatch we receive from the file | ||
|
|
@@ -129,10 +134,23 @@ impl RecordBatchTransformer { | |
| Self { | ||
| snapshot_schema, | ||
| projected_iceberg_field_ids, | ||
| constant_fields: HashMap::new(), | ||
| batch_transform: None, | ||
| } | ||
| } | ||
|
|
||
| /// Add a constant value for a specific field ID. | ||
| /// This is used for virtual/metadata fields like _file that have constant values per batch. | ||
| /// | ||
| /// # Arguments | ||
| /// * `field_id` - The field ID to associate with the constant | ||
| /// * `value` - The constant value for this field | ||
| pub(crate) fn with_constant(mut self, field_id: i32, value: PrimitiveLiteral) -> Result<Self> { | ||
| let arrow_type = Self::primitive_literal_to_arrow_type(&value)?; | ||
| self.constant_fields.insert(field_id, (arrow_type, value)); | ||
| Ok(self) | ||
| } | ||
|
|
||
| pub(crate) fn process_record_batch( | ||
| &mut self, | ||
| record_batch: RecordBatch, | ||
|
|
@@ -167,6 +185,7 @@ impl RecordBatchTransformer { | |
| record_batch.schema_ref(), | ||
| self.snapshot_schema.as_ref(), | ||
| &self.projected_iceberg_field_ids, | ||
| &self.constant_fields, | ||
| )?); | ||
|
|
||
| self.process_record_batch(record_batch)? | ||
|
|
@@ -185,6 +204,7 @@ impl RecordBatchTransformer { | |
| source_schema: &ArrowSchemaRef, | ||
| snapshot_schema: &IcebergSchema, | ||
| projected_iceberg_field_ids: &[i32], | ||
| constant_fields: &HashMap<i32, (DataType, PrimitiveLiteral)>, | ||
| ) -> Result<BatchTransform> { | ||
| let mapped_unprojected_arrow_schema = Arc::new(schema_to_arrow_schema(snapshot_schema)?); | ||
| let field_id_to_mapped_schema_map = | ||
|
|
@@ -195,11 +215,25 @@ impl RecordBatchTransformer { | |
| let fields: Result<Vec<_>> = projected_iceberg_field_ids | ||
| .iter() | ||
| .map(|field_id| { | ||
| Ok(field_id_to_mapped_schema_map | ||
| .get(field_id) | ||
| .ok_or(Error::new(ErrorKind::Unexpected, "field not found"))? | ||
| .0 | ||
| .clone()) | ||
| // Check if this is a constant/virtual field (pre-computed) | ||
| if let Some((arrow_type, _)) = constant_fields.get(field_id) { | ||
| // Create a field for the virtual column | ||
| let field_name = get_metadata_column_name(*field_id)?; | ||
| Ok(Arc::new( | ||
| Field::new(field_name, arrow_type.clone(), false).with_metadata( | ||
| HashMap::from([( | ||
| PARQUET_FIELD_ID_META_KEY.to_string(), | ||
| field_id.to_string(), | ||
| )]), | ||
| ), | ||
| )) | ||
| } else { | ||
| Ok(field_id_to_mapped_schema_map | ||
| .get(field_id) | ||
| .ok_or(Error::new(ErrorKind::Unexpected, "field not found"))? | ||
| .0 | ||
| .clone()) | ||
| } | ||
| }) | ||
| .collect(); | ||
|
|
||
|
|
@@ -214,6 +248,7 @@ impl RecordBatchTransformer { | |
| snapshot_schema, | ||
| projected_iceberg_field_ids, | ||
| field_id_to_mapped_schema_map, | ||
| constant_fields, | ||
| )?, | ||
| target_schema, | ||
| }), | ||
|
|
@@ -270,11 +305,21 @@ impl RecordBatchTransformer { | |
| snapshot_schema: &IcebergSchema, | ||
| projected_iceberg_field_ids: &[i32], | ||
| field_id_to_mapped_schema_map: HashMap<i32, (FieldRef, usize)>, | ||
| constant_fields: &HashMap<i32, (DataType, PrimitiveLiteral)>, | ||
| ) -> Result<Vec<ColumnSource>> { | ||
| let field_id_to_source_schema_map = | ||
| Self::build_field_id_to_arrow_schema_map(source_schema)?; | ||
|
|
||
| projected_iceberg_field_ids.iter().map(|field_id|{ | ||
| // Check if this is a constant/virtual field (pre-computed) | ||
| if let Some((arrow_type, value)) = constant_fields.get(field_id) { | ||
| // This is a virtual field - add it with the constant value | ||
| return Ok(ColumnSource::Add { | ||
| value: Some(value.clone()), | ||
| target_type: arrow_type.clone(), | ||
| }); | ||
| } | ||
|
|
||
| let (target_field, _) = field_id_to_mapped_schema_map.get(field_id).ok_or( | ||
| Error::new(ErrorKind::Unexpected, "could not find field in schema") | ||
| )?; | ||
|
|
@@ -429,6 +474,27 @@ impl RecordBatchTransformer { | |
| let vals: Vec<Option<f64>> = vec![None; num_rows]; | ||
| Arc::new(Float64Array::from(vals)) | ||
| } | ||
| (DataType::RunEndEncoded(_, _), Some(PrimitiveLiteral::String(value))) => { | ||
|
||
| // Create Run-End Encoded array for constant string values (e.g., file paths) | ||
| // This is more memory-efficient than repeating the same value for every row | ||
| let run_ends = if num_rows == 0 { | ||
| Int32Array::from(Vec::<i32>::new()) | ||
| } else { | ||
| Int32Array::from(vec![num_rows as i32]) | ||
| }; | ||
| let values = if num_rows == 0 { | ||
| StringArray::from(Vec::<&str>::new()) | ||
| } else { | ||
| StringArray::from(vec![value.as_str()]) | ||
| }; | ||
| Arc::new(RunArray::try_new(&run_ends, &values).map_err(|e| { | ||
| Error::new( | ||
| ErrorKind::Unexpected, | ||
| "Failed to create RunArray for constant string", | ||
| ) | ||
| .with_source(e) | ||
| })?) | ||
| } | ||
| (DataType::Utf8, Some(PrimitiveLiteral::String(value))) => { | ||
| Arc::new(StringArray::from(vec![value.clone(); num_rows])) | ||
| } | ||
|
|
@@ -452,6 +518,35 @@ impl RecordBatchTransformer { | |
| } | ||
| }) | ||
| } | ||
|
|
||
| /// Converts a PrimitiveLiteral to its corresponding Arrow DataType. | ||
| /// This is used for virtual fields to determine the Arrow type based on the constant value. | ||
| fn primitive_literal_to_arrow_type(literal: &PrimitiveLiteral) -> Result<DataType> { | ||
| Ok(match literal { | ||
| PrimitiveLiteral::Boolean(_) => DataType::Boolean, | ||
| PrimitiveLiteral::Int(_) => DataType::Int32, | ||
| PrimitiveLiteral::Long(_) => DataType::Int64, | ||
| PrimitiveLiteral::Float(_) => DataType::Float32, | ||
| PrimitiveLiteral::Double(_) => DataType::Float64, | ||
| PrimitiveLiteral::String(_) => { | ||
| // Use Run-End Encoding for constant strings (memory efficient) | ||
| let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false)); | ||
| // Note that this is nullable, as Arrow expects this when building the | ||
| // final Arrow schema with `RunArray::try_new`. | ||
| let values_field = Arc::new(Field::new("values", DataType::Utf8, true)); | ||
gbrgr marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| DataType::RunEndEncoded(run_ends_field, values_field) | ||
| } | ||
| PrimitiveLiteral::Binary(_) => DataType::Binary, | ||
| PrimitiveLiteral::Int128(_) => DataType::Decimal128(38, 0), | ||
| PrimitiveLiteral::UInt128(_) => DataType::Decimal128(38, 0), | ||
| PrimitiveLiteral::AboveMax | PrimitiveLiteral::BelowMin => { | ||
| return Err(Error::new( | ||
| ErrorKind::Unexpected, | ||
| "Cannot create arrow type for AboveMax/BelowMin literal", | ||
| )); | ||
| } | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -96,4 +96,5 @@ mod utils; | |
| pub mod writer; | ||
|
|
||
| mod delete_vector; | ||
| pub mod metadata_columns; | ||
| pub mod puffin; | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,88 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| //! Metadata columns (virtual/reserved fields) for Iceberg tables. | ||
| //! | ||
| //! This module defines metadata columns that can be requested in projections | ||
| //! but are not stored in data files. Instead, they are computed on-the-fly | ||
| //! during reading. Examples include the _file column (file path) and future | ||
| //! columns like partition values or row numbers. | ||
| use crate::{Error, ErrorKind, Result}; | ||
|
|
||
| /// Reserved field ID for the file path (_file) column per Iceberg spec | ||
| pub const RESERVED_FIELD_ID_FILE: i32 = 2147483646; | ||
|
||
|
|
||
| /// Reserved column name for the file path metadata column | ||
| pub const RESERVED_COL_NAME_FILE: &str = "_file"; | ||
|
|
||
liurenjie1024 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| /// Returns the column name for a metadata field ID. | ||
| /// | ||
| /// # Arguments | ||
| /// * `field_id` - The metadata field ID | ||
| /// | ||
| /// # Returns | ||
| /// The name of the metadata column, or an error if the field ID is not recognized | ||
| pub fn get_metadata_column_name(field_id: i32) -> Result<&'static str> { | ||
| match field_id { | ||
| RESERVED_FIELD_ID_FILE => Ok(RESERVED_COL_NAME_FILE), | ||
| _ => Err(Error::new( | ||
| ErrorKind::Unexpected, | ||
| format!("Unknown/unsupported metadata field ID: {field_id}"), | ||
| )), | ||
| } | ||
| } | ||
|
|
||
| /// Returns the field ID for a metadata column name. | ||
| /// | ||
| /// # Arguments | ||
| /// * `column_name` - The metadata column name | ||
| /// | ||
| /// # Returns | ||
| /// The field ID of the metadata column, or an error if the column name is not recognized | ||
| pub fn get_metadata_field_id(column_name: &str) -> Result<i32> { | ||
| match column_name { | ||
| RESERVED_COL_NAME_FILE => Ok(RESERVED_FIELD_ID_FILE), | ||
gbrgr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| _ => Err(Error::new( | ||
| ErrorKind::Unexpected, | ||
| format!("Unknown/unsupported metadata column name: {column_name}"), | ||
| )), | ||
| } | ||
| } | ||
|
|
||
| /// Checks if a field ID is a metadata field. | ||
| /// | ||
| /// # Arguments | ||
| /// * `field_id` - The field ID to check | ||
| /// | ||
| /// # Returns | ||
| /// `true` if the field ID is a (currently supported) metadata field, `false` otherwise | ||
| pub fn is_metadata_field(field_id: i32) -> bool { | ||
| field_id == RESERVED_FIELD_ID_FILE | ||
gbrgr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // Additional metadata fields can be checked here in the future | ||
| } | ||
|
|
||
| /// Checks if a column name is a metadata column. | ||
| /// | ||
| /// # Arguments | ||
| /// * `column_name` - The column name to check | ||
| /// | ||
| /// # Returns | ||
| /// `true` if the column name is a metadata column, `false` otherwise | ||
gbrgr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| pub fn is_metadata_column_name(column_name: &str) -> bool { | ||
| get_metadata_field_id(column_name).is_ok() | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have
Datumtype exactly forDataType + PrimitiveLiteral.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think
Datumis ratherPrimitiveType+PrimitiveLiteral, but here we have the ArrowDataType.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the benefit of using arrow's DataType here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see now what you mean. I changed tha map to use Datum and changed the static file field accordingly.